You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/18 08:51:19 UTC
[kylin] branch master updated: KYLIN-4921: New rowkey for streamv2
source config and fix bug that can't create same name table in diff project
(#1604)
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new decb4ab KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604)
decb4ab is described below
commit decb4ab7ed3fa0d8fbf0bfd7544676d139ff8948
Author: Kun Liu <li...@apache.org>
AuthorDate: Thu Mar 18 16:51:08 2021 +0800
KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604)
* support diff project with same stream name
* fix bug: when create duplicated table, the original stream table can't be deleted
---
.../kylin/realtime/BuildCubeWithStreamV2.java | 3 +-
.../rest/controller/StreamingV2Controller.java | 35 +++++++--
.../kylin/rest/controller/TableController.java | 2 +-
.../apache/kylin/rest/service/ProjectService.java | 2 +-
.../kylin/rest/service/StreamingV2Service.java | 12 +--
.../apache/kylin/rest/service/TableService.java | 7 +-
.../stream/core/source/StreamingSourceConfig.java | 32 +++++++-
.../core/source/StreamingSourceConfigManager.java | 88 ++++++++++++++++------
.../kylin/stream/server/StreamingServer.java | 2 +-
.../source/kafka/KafkaBatchSourceAdaptor.java | 2 +-
.../kylin/stream/source/kafka/KafkaSource.java | 8 +-
.../org/apache/kylin/tool/CubeMigrationCLI.java | 15 +++-
.../kylin/tool/extractor/CubeMetaExtractor.java | 17 +----
webapp/app/js/controllers/sourceMeta.js | 7 +-
14 files changed, 168 insertions(+), 64 deletions(-)
diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index fb85b26..7f2c446 100644
--- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -160,7 +160,8 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
final String streamingTableName = cubeInstance.getRootFactTable();
- final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName);
+ final String projectName = cubeInstance.getProject();
+ final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName, projectName);
topicName = KafkaSource.getTopicName(sourceConfig.getProperties());
String bootstrapServers = KafkaSource.getBootstrapServers(sourceConfig.getProperties());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index 0a0cc62..a73a88c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -104,10 +105,21 @@ public class StreamingV2Controller extends BasicController {
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
@ResponseBody
public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String table,
+ @RequestParam(value = "project", required = false) String project,
@RequestParam(value = "limit", required = false) Integer limit,
@RequestParam(value = "offset", required = false) Integer offset) {
try {
- return streamingService.getStreamingConfigs(table, limit, offset);
+ // query all streaming config or query one streaming config
+ if (!Strings.isEmpty(table) && !Strings.isEmpty(project)) {
+ // check the table metadata
+ if (tableService.getTableDescByName(table, false, project) == null) {
+ // the table metadata doesn't exist
+ throw new InternalErrorException(String.format(Locale.ROOT,
+ "The table %s of project %s doesn't exist, please make the stream table exists",
+ table, project));
+ }
+ }
+ return streamingService.getStreamingConfigs(table, project, limit, offset);
} catch (IOException e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
@@ -140,10 +152,15 @@ public class StreamingV2Controller extends BasicController {
try {
try {
tableDesc.setUuid(UUID.randomUUID().toString());
+ if (tableService.getTableDescByName(tableDesc.getIdentity(), false, project) != null) {
+ throw new IOException(String.format(Locale.ROOT,
+ "The table %s of project %s exists",
+ tableDesc.getIdentity(), project));
+ }
tableService.loadTableToProject(tableDesc, null, project);
saveTableSuccess = true;
} catch (IOException e) {
- throw new BadRequestException("Failed to add streaming table.");
+ throw new BadRequestException("Failed to add streaming table, because of " + e.getMessage());
}
try {
streamingSourceConfig.setName(tableDesc.getIdentity());
@@ -159,7 +176,8 @@ public class StreamingV2Controller extends BasicController {
if (!saveTableSuccess || !saveStreamingSuccess) {
if (saveTableSuccess) {
try {
- tableService.unloadHiveTable(tableDesc.getIdentity(), project);
+ // just drop the table metadata and don't drop the stream source config info
+ tableService.unloadHiveTable(tableDesc.getIdentity(), project, false);
} catch (IOException e) {
shouldThrow = new InternalErrorException(
"Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e);
@@ -280,7 +298,7 @@ public class StreamingV2Controller extends BasicController {
final String user = SecurityContextHolder.getContext().getAuthentication().getName();
logger.info("{} try to updateStreamingConfig.", user);
try {
- streamingSourceConfig = streamingService.updateStreamingConfig(streamingSourceConfig);
+ streamingService.updateStreamingConfig(streamingSourceConfig);
} catch (AccessDeniedException accessDeniedException) {
throw new ForbiddenException("You don't have right to update this StreamingSourceConfig.");
} catch (Exception e) {
@@ -292,10 +310,13 @@ public class StreamingV2Controller extends BasicController {
return streamingRequest;
}
- @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
+ @Deprecated
+ @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
- public void deleteConfig(@PathVariable String configName) throws IOException {
- StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName);
+ public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
+ // This method will never be called by the frontend.
+ StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName, project);
if (null == config) {
throw new NotFoundException("StreamingSourceConfig with name " + configName + " not found..");
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 6c529b9..2af8f54 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -159,7 +159,7 @@ public class TableController extends BasicController {
try {
for (String tableName : StringUtil.splitByComma(tables)) {
tableACLService.deleteFromTableACLByTbl(project, tableName);
- if (tableService.unloadHiveTable(tableName, project)) {
+ if (tableService.unloadHiveTable(tableName, project, true)) {
unLoadSuccess.add(tableName);
} else {
unLoadFail.add(tableName);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 40805a9..c06d56f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -152,7 +152,7 @@ public class ProjectService extends BasicService {
public void deleteProject(String projectName, ProjectInstance project) throws IOException {
Set<String> tables = project.getTables();
for (String table : Sets.newTreeSet(tables)) {
- tableService.unloadHiveTable(table, projectName);
+ tableService.unloadHiveTable(table, projectName, true);
getTableManager().removeTableExt(table, projectName);
getTableACLManager().deleteTableACLByTbl(projectName, table);
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index c43d625..3ef0ee2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -105,12 +105,12 @@ public class StreamingV2Service extends BasicService {
receiverAdminClient = adminClient;
}
- public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
+ public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
- if (StringUtils.isEmpty(table)) {
+ if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) {
streamingSourceConfigs = getStreamingManagerV2().listAllStreaming();
} else {
- StreamingSourceConfig config = getStreamingManagerV2().getConfig(table);
+ StreamingSourceConfig config = getStreamingManagerV2().getConfig(table, projectName);
if (config != null) {
streamingSourceConfigs.add(config);
}
@@ -119,10 +119,10 @@ public class StreamingV2Service extends BasicService {
return streamingSourceConfigs;
}
- public List<StreamingSourceConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset)
+ public List<StreamingSourceConfig> getStreamingConfigs(final String table, final String projectName, final Integer limit, final Integer offset)
throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs;
- streamingSourceConfigs = listAllStreamingConfigs(table);
+ streamingSourceConfigs = listAllStreamingConfigs(table, projectName);
if (limit == null || offset == null) {
return streamingSourceConfigs;
@@ -138,7 +138,7 @@ public class StreamingV2Service extends BasicService {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
+ " or hasPermission(#project, 'ADMINISTRATION')")
public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
- if (getStreamingManagerV2().getConfig(config.getName()) != null) {
+ if (getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) {
throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
}
StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 069a460..2d76acc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -284,9 +284,10 @@ public class TableService extends BasicService {
* that's why we have two if statement here.
* @param tableName
* @param project
+ * @param needRemoveStreamInfo
* @return
*/
- public boolean unloadHiveTable(String tableName, String project) throws IOException {
+ public boolean unloadHiveTable(String tableName, String project, boolean needRemoveStreamInfo) throws IOException {
aclEvaluate.checkProjectAdminPermission(project);
Message msg = MsgPicker.getMsg();
@@ -319,7 +320,9 @@ public class TableService extends BasicService {
// remove streaming info
SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
ISource source = sourceManager.getCachedSource(desc);
- source.unloadTable(tableName, project);
+ if (!desc.isStreamingTable() || needRemoveStreamInfo) {
+ source.unloadTable(tableName, project);
+ }
return rtn;
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
index 3147a59..9bb4412 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
+import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -32,6 +33,7 @@ import org.apache.kylin.common.persistence.Serializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
/**
@@ -53,10 +55,28 @@ public class StreamingSourceConfig extends RootPersistentEntity {
@JsonProperty("properties")
private Map<String, String> properties = Maps.newLinkedHashMap();
- public static String concatResourcePath(String name) {
- return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + ".json";
+ @JsonProperty("project_name")
+ private String projectName;
+
+ @Deprecated
+ static String concatResourcePath(String name) {
+ return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + MetadataConstants.FILE_SURFIX;
+ }
+
+ public static String concatResourcePathWithProjName(String name, String projectName) {
+ if (Strings.isEmpty(projectName)) {
+ return concatResourcePath(name);
+ } else {
+ // like table desc
+ return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + "--" + projectName + MetadataConstants.FILE_SURFIX;
+ }
+ }
+
+ public String getResourcePathWithProjName() {
+ return concatResourcePathWithProjName(name, projectName);
}
+ @Deprecated
public String getResourcePath() {
return concatResourcePath(name);
}
@@ -85,6 +105,14 @@ public class StreamingSourceConfig extends RootPersistentEntity {
this.parserInfo = parserInfo;
}
+ public void setProjectName(String projectName) {
+ this.projectName = projectName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
@Override
public StreamingSourceConfig clone() {
try {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
index 7a0d62b..e619003 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
@@ -24,6 +24,7 @@ import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
+import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -83,7 +84,6 @@ public class StreamingSourceConfigManager {
ResourceStore store = getStore();
logger.info("Load all streaming metadata from folder "
+ store.getReadableResourcePath(ResourceStore.STREAMING_V2_RESOURCE_ROOT));
-
List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_V2_RESOURCE_ROOT,
MetadataConstants.FILE_SURFIX);
for (String path : paths) {
@@ -94,7 +94,10 @@ public class StreamingSourceConfigManager {
logger.error("Error loading streaming desc " + path, e);
continue;
}
- if (path.equals(streamingSourceConfig.getResourcePath()) == false) {
+ // check path without project name
+ // check path with project name
+ if (path.equals(streamingSourceConfig.getResourcePath()) == false &&
+ path.equals(streamingSourceConfig.getResourcePathWithProjName()) == false) {
logger.error("Skip suspicious desc at " + path + ", " + streamingSourceConfig + " should be at "
+ streamingSourceConfig.getResourcePath());
continue;
@@ -113,26 +116,62 @@ public class StreamingSourceConfigManager {
* @param name
* @throws IOException
*/
- public StreamingSourceConfig reloadStreamingConfigLocal(String name) throws IOException {
-
- // Save Source
- String path = StreamingSourceConfig.concatResourcePath(name);
-
+ public StreamingSourceConfig reloadStreamingConfigLocal(String name, String projectName) throws IOException {
+ if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
+ throw new StreamingException(String.format(Locale.ROOT,
+ "the table name %s or project name %s is null", name, projectName));
+ }
+ // path with project name
+ String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
// Reload the StreamingSourceConfig
- StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
- return ndesc;
+ StreamingSourceConfig config = loadStreamingConfigAt(path);
+ if (config == null) {
+ // the path with project name doesn't contain the source config, and check the old path without project name.
+ path = StreamingSourceConfig.concatResourcePath(name);
+ config = loadStreamingConfigAt(path);
+ if (config != null) {
+ config.setProjectName(projectName);
+ // remove from the old path, and save the source config to the new path
+ removeStreamingConfig(config);
+ saveStreamingConfig(config);
+ }
+ }
+ return config;
}
// remove streamingSourceConfig
public void removeStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
- String path = streamingSourceConfig.getResourcePath();
- getStore().deleteResource(path);
+ // path with project name
+ String path = streamingSourceConfig.getResourcePathWithProjName();
+ if (loadStreamingConfigAt(path) != null) {
+ getStore().deleteResource(path);
+ } else {
+ // The source is stored in the old path which is prefix + table name + suffix
+ path = streamingSourceConfig.getResourcePath();
+ getStore().deleteResource(path);
+ }
+ }
+
+ public StreamingSourceConfig getConfig(String name, String projectName) {
+ name = name.toUpperCase(Locale.ROOT);
+ try {
+ return reloadStreamingConfigLocal(name, projectName);
+ } catch (IOException e) {
+ throw new StreamingException(e);
+ }
}
- public StreamingSourceConfig getConfig(String name) {
+ public StreamingSourceConfig getConfigMustWithProject(String name, String projectName) {
name = name.toUpperCase(Locale.ROOT);
+ if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
+ throw new StreamingException(String.format(Locale.ROOT,
+ "the table name %s or project name %s is null", name, projectName));
+ }
+ // path with project name
+ String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
+ // Reload the StreamingSourceConfig
try {
- return reloadStreamingConfigLocal(name);
+ return loadStreamingConfigAt(path);
} catch (IOException e) {
throw new StreamingException(e);
}
@@ -140,32 +179,35 @@ public class StreamingSourceConfigManager {
/**
*
- * @param desc
+ * @param streamingSourceConfig
* @return
* @throws IOException
*/
- public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig desc) throws IOException {
+ public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
// Validate CubeDesc
- if (desc.getUuid() == null || desc.getName() == null) {
+ if (streamingSourceConfig.getUuid() == null || streamingSourceConfig.getName() == null) {
throw new IllegalArgumentException("SteamingConfig Illegal.");
}
- // Save Source
- String path = desc.getResourcePath();
- getStore().putResource(path, desc, System.currentTimeMillis(), STREAMING_SERIALIZER);
+ // remove Source
+ removeStreamingConfig(streamingSourceConfig);
+
+ // Save Source, the path with project name
+ String path = streamingSourceConfig.getResourcePathWithProjName();
+ getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(), STREAMING_SERIALIZER);
// Reload the StreamingSourceConfig
- StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
+ StreamingSourceConfig newStreamingSourceConfig = loadStreamingConfigAt(path);
- return ndesc;
+ return newStreamingSourceConfig;
}
public StreamingSourceConfig saveStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
if (streamingSourceConfig == null || StringUtils.isEmpty(streamingSourceConfig.getName())) {
throw new IllegalArgumentException();
}
-
- String path = StreamingSourceConfig.concatResourcePath(streamingSourceConfig.getName());
+ // path = prefix + /table name---project name + suffix
+ String path = streamingSourceConfig.getResourcePathWithProjName();
getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(),
StreamingSourceConfig.SERIALIZER);
return streamingSourceConfig;
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 7ace4ec..d09d428 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -703,7 +703,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
CubeDescManager.getInstance(kylinConfig).reloadCubeDescLocal(cubeName);
CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).reloadCubeQuietly(cubeName);
StreamingSourceConfigManager.getInstance(kylinConfig).reloadStreamingConfigLocal(
- cubeInstance.getRootFactTable());
+ cubeInstance.getRootFactTable(), cubeInstance.getProject());
}
private String calLocalSegmentCacheDir() {
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
index 2dc2a9c..4173a5c 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
@@ -34,7 +34,7 @@ public class KafkaBatchSourceAdaptor extends HiveSource {
public void unloadTable(String tableName, String project) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
StreamingSourceConfigManager sourceConfigManager = StreamingSourceConfigManager.getInstance(kylinConfig);
- StreamingSourceConfig config = sourceConfigManager.getConfig(tableName);
+ StreamingSourceConfig config = sourceConfigManager.getConfig(tableName, project);
if (config == null) {
return;
}
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
index 7db1961..e2ab584 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
@@ -76,8 +76,9 @@ public class KafkaSource implements IStreamingSource {
KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName);
String streamingTableName = cube.getRootFactTable();
+ String projectName = cube.getProject();
StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
- .getConfig(streamingTableName);
+ .getConfig(streamingTableName, projectName);
String topicName = getTopicName(streamingSourceConfig.getProperties());
Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig());
@@ -145,8 +146,9 @@ public class KafkaSource implements IStreamingSource {
CubeInstance cubeInstance = CubeManager.getInstance(kylinConf).getCube(cubeName);
IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance);
String streamingName = cubeInstance.getRootFactTable();
+ String projectName = cubeInstance.getProject();
StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
- .getConfig(streamingName);
+ .getConfig(streamingName, projectName);
String topic = getTopicName(streamingSourceConfig.getProperties());
Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cubeInstance.getConfig());
@@ -295,7 +297,7 @@ public class KafkaSource implements IStreamingSource {
CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName);
String streamingTableName = cube.getRootFactTable();
StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
- .getConfig(streamingTableName);
+ .getConfig(streamingTableName, cube.getProject());
String topicName = getTopicName(streamingSourceConfig.getProperties());
ISourcePosition sourcePosition = new KafkaPosition();
Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig());
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 033a669..0c3d6e9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -19,6 +19,7 @@
package org.apache.kylin.tool;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -71,6 +72,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -359,7 +361,18 @@ public class CubeMigrationCLI extends AbstractApplication {
if (cubeDesc.isStreamingCube()) {
// add streaming source config info for streaming cube
- metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
+ String tableName = cubeDesc.getModel().getRootFactTableName();
+ String projectName = cubeDesc.getProject();
+ KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
+ StreamingSourceConfigManager manager = StreamingSourceConfigManager.getInstance(kylinConf);
+ StreamingSourceConfig sourceConfig = manager.getConfig(tableName, projectName);
+ if (sourceConfig != null) {
+ metaResource.add(sourceConfig.getResourcePathWithProjName());
+ } else {
+ throw new InterruptedIOException(String.format(Locale.ROOT,
+ "The stream source config doesn't exist, the table name: %s, the project name: %s",
+ tableName, projectName));
+ }
}
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
index 599e58d..893365a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
@@ -20,7 +20,6 @@ package org.apache.kylin.tool.extractor;
import java.io.File;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -444,18 +443,10 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
}
private void addStreamingV2Config(CubeInstance cube) {
- Collection<StreamingSourceConfig> streamingConfigs;
- try {
- streamingConfigs = streamingSourceConfigManager.listAllStreaming();
- } catch (IOException ioe) {
- logger.error("", ioe);
- return;
- }
- for (StreamingSourceConfig streamingConfig : streamingConfigs) {
- if (streamingConfig.getName() != null
- && streamingConfig.getName().equalsIgnoreCase(cube.getRootFactTable())) {
- addRequired(StreamingSourceConfig.concatResourcePath(streamingConfig.getName()));
- }
+ StreamingSourceConfig streamingSourceConfig =
+ streamingSourceConfigManager.getConfig(cube.getRootFactTable(), cube.getProject());
+ if (streamingSourceConfig != null) {
+ addRequired(streamingSourceConfig.getResourcePathWithProjName());
}
}
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index f75a1c4..3ebabbb 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -1105,7 +1105,8 @@ KylinApp
$scope.streamingConfig = {
name: '',
properties: {},
- parser_info: {}
+ parser_info: {},
+ project_name: ''
};
$scope.tableData = {
@@ -1399,6 +1400,7 @@ KylinApp
$scope.streamingConfig.parser_info.ts_parser = $scope.streaming.TSParser;
$scope.streamingConfig.parser_info.ts_pattern = $scope.streaming.TSPattern;
$scope.streamingConfig.parser_info.field_mapping = {};
+ $scope.streamingConfig.project_name = projectName;
$scope.tableData.columns.forEach(function(col) {
if (col.comment) {
$scope.streamingConfig.parser_info.field_mapping[col.name] = col.comment.replace(/\|/g, '.') || ''
@@ -1530,7 +1532,8 @@ KylinApp
if (_.values(tableConfig.streamingSourceType).indexOf($scope.tableModel.selectedSrcTable.source_type) > -1) {
var table = $scope.tableModel.selectedSrcTable;
var streamingName = table.database+"."+table.name;
- StreamingServiceV2.getConfig({table:streamingName}, function (configs) {
+ var projectName = $scope.projectModel.getSelectedProject();
+ StreamingServiceV2.getConfig({table:streamingName, project: projectName}, function (configs) {
$scope.currentStreamingConfig = configs[0];
});
}