You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/18 08:51:19 UTC

[kylin] branch master updated: KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new decb4ab  KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604)
decb4ab is described below

commit decb4ab7ed3fa0d8fbf0bfd7544676d139ff8948
Author: Kun Liu <li...@apache.org>
AuthorDate: Thu Mar 18 16:51:08 2021 +0800

    KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604)
    
    * support diff project with same stream name
    
    * fix bug: when create duplicated table, the original stream table can't be deleted
---
 .../kylin/realtime/BuildCubeWithStreamV2.java      |  3 +-
 .../rest/controller/StreamingV2Controller.java     | 35 +++++++--
 .../kylin/rest/controller/TableController.java     |  2 +-
 .../apache/kylin/rest/service/ProjectService.java  |  2 +-
 .../kylin/rest/service/StreamingV2Service.java     | 12 +--
 .../apache/kylin/rest/service/TableService.java    |  7 +-
 .../stream/core/source/StreamingSourceConfig.java  | 32 +++++++-
 .../core/source/StreamingSourceConfigManager.java  | 88 ++++++++++++++++------
 .../kylin/stream/server/StreamingServer.java       |  2 +-
 .../source/kafka/KafkaBatchSourceAdaptor.java      |  2 +-
 .../kylin/stream/source/kafka/KafkaSource.java     |  8 +-
 .../org/apache/kylin/tool/CubeMigrationCLI.java    | 15 +++-
 .../kylin/tool/extractor/CubeMetaExtractor.java    | 17 +----
 webapp/app/js/controllers/sourceMeta.js            |  7 +-
 14 files changed, 168 insertions(+), 64 deletions(-)

diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
index fb85b26..7f2c446 100644
--- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
+++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java
@@ -160,7 +160,8 @@ public class BuildCubeWithStreamV2 extends KylinTestBase {
 
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
         final String streamingTableName = cubeInstance.getRootFactTable();
-        final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName);
+        final String projectName = cubeInstance.getProject();
+        final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName, projectName);
 
         topicName = KafkaSource.getTopicName(sourceConfig.getProperties());
         String bootstrapServers = KafkaSource.getBootstrapServers(sourceConfig.getProperties());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index 0a0cc62..a73a88c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -104,10 +105,21 @@ public class StreamingV2Controller extends BasicController {
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
     @ResponseBody
     public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String table,
+            @RequestParam(value = "project", required = false) String project,
             @RequestParam(value = "limit", required = false) Integer limit,
             @RequestParam(value = "offset", required = false) Integer offset) {
         try {
-            return streamingService.getStreamingConfigs(table, limit, offset);
+            // query all streaming config or query one streaming config
+            if (!Strings.isEmpty(table) && !Strings.isEmpty(project)) {
+                // check the table metadata
+                if (tableService.getTableDescByName(table, false, project) == null) {
+                    // the table metadata doesn't exist
+                    throw new InternalErrorException(String.format(Locale.ROOT,
+                            "The table %s of project %s doesn't exist, please make the stream table exists",
+                            table, project));
+                }
+            }
+            return streamingService.getStreamingConfigs(table, project, limit, offset);
         } catch (IOException e) {
             logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
             throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
@@ -140,10 +152,15 @@ public class StreamingV2Controller extends BasicController {
         try {
             try {
                 tableDesc.setUuid(UUID.randomUUID().toString());
+                if (tableService.getTableDescByName(tableDesc.getIdentity(), false, project) != null) {
+                    throw new IOException(String.format(Locale.ROOT,
+                            "The table %s of project %s exists",
+                            tableDesc.getIdentity(), project));
+                }
                 tableService.loadTableToProject(tableDesc, null, project);
                 saveTableSuccess = true;
             } catch (IOException e) {
-                throw new BadRequestException("Failed to add streaming table.");
+                throw new BadRequestException("Failed to add streaming table, because of " + e.getMessage());
             }
             try {
                 streamingSourceConfig.setName(tableDesc.getIdentity());
@@ -159,7 +176,8 @@ public class StreamingV2Controller extends BasicController {
             if (!saveTableSuccess || !saveStreamingSuccess) {
                 if (saveTableSuccess) {
                     try {
-                        tableService.unloadHiveTable(tableDesc.getIdentity(), project);
+                        // just drop the table metadata and don't drop the stream source config info
+                        tableService.unloadHiveTable(tableDesc.getIdentity(), project, false);
                     } catch (IOException e) {
                         shouldThrow = new InternalErrorException(
                                 "Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e);
@@ -280,7 +298,7 @@ public class StreamingV2Controller extends BasicController {
         final String user = SecurityContextHolder.getContext().getAuthentication().getName();
         logger.info("{} try to updateStreamingConfig.", user);
         try {
-            streamingSourceConfig = streamingService.updateStreamingConfig(streamingSourceConfig);
+            streamingService.updateStreamingConfig(streamingSourceConfig);
         } catch (AccessDeniedException accessDeniedException) {
             throw new ForbiddenException("You don't have right to update this StreamingSourceConfig.");
         } catch (Exception e) {
@@ -292,10 +310,13 @@ public class StreamingV2Controller extends BasicController {
         return streamingRequest;
     }
 
-    @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE })
+    @Deprecated
+    @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
-    public void deleteConfig(@PathVariable String configName) throws IOException {
-        StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName);
+    public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException {
+        // This method will never be called by the frontend.
+        StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName, project);
         if (null == config) {
             throw new NotFoundException("StreamingSourceConfig with name " + configName + " not found..");
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 6c529b9..2af8f54 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -159,7 +159,7 @@ public class TableController extends BasicController {
         try {
             for (String tableName : StringUtil.splitByComma(tables)) {
                 tableACLService.deleteFromTableACLByTbl(project, tableName);
-                if (tableService.unloadHiveTable(tableName, project)) {
+                if (tableService.unloadHiveTable(tableName, project, true)) {
                     unLoadSuccess.add(tableName);
                 } else {
                     unLoadFail.add(tableName);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 40805a9..c06d56f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -152,7 +152,7 @@ public class ProjectService extends BasicService {
     public void deleteProject(String projectName, ProjectInstance project) throws IOException {
         Set<String> tables = project.getTables();
         for (String table : Sets.newTreeSet(tables)) {
-            tableService.unloadHiveTable(table, projectName);
+            tableService.unloadHiveTable(table, projectName, true);
             getTableManager().removeTableExt(table, projectName);
             getTableACLManager().deleteTableACLByTbl(projectName, table);
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index c43d625..3ef0ee2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -105,12 +105,12 @@ public class StreamingV2Service extends BasicService {
         receiverAdminClient = adminClient;
     }
 
-    public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException {
+    public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException {
         List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
-        if (StringUtils.isEmpty(table)) {
+        if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) {
             streamingSourceConfigs = getStreamingManagerV2().listAllStreaming();
         } else {
-            StreamingSourceConfig config = getStreamingManagerV2().getConfig(table);
+            StreamingSourceConfig config = getStreamingManagerV2().getConfig(table, projectName);
             if (config != null) {
                 streamingSourceConfigs.add(config);
             }
@@ -119,10 +119,10 @@ public class StreamingV2Service extends BasicService {
         return streamingSourceConfigs;
     }
 
-    public List<StreamingSourceConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset)
+    public List<StreamingSourceConfig> getStreamingConfigs(final String table, final String projectName, final Integer limit, final Integer offset)
             throws IOException {
         List<StreamingSourceConfig> streamingSourceConfigs;
-        streamingSourceConfigs = listAllStreamingConfigs(table);
+        streamingSourceConfigs = listAllStreamingConfigs(table, projectName);
 
         if (limit == null || offset == null) {
             return streamingSourceConfigs;
@@ -138,7 +138,7 @@ public class StreamingV2Service extends BasicService {
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN
             + " or hasPermission(#project, 'ADMINISTRATION')")
     public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
-        if (getStreamingManagerV2().getConfig(config.getName()) != null) {
+        if (getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) {
             throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
         }
         StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 069a460..2d76acc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -284,9 +284,10 @@ public class TableService extends BasicService {
      * that's why we have two if statement here.
      * @param tableName
      * @param project
+     * @param  needRemoveStreamInfo
      * @return
      */
-    public boolean unloadHiveTable(String tableName, String project) throws IOException {
+    public boolean unloadHiveTable(String tableName, String project, boolean needRemoveStreamInfo) throws IOException {
         aclEvaluate.checkProjectAdminPermission(project);
         Message msg = MsgPicker.getMsg();
 
@@ -319,7 +320,9 @@ public class TableService extends BasicService {
         // remove streaming info
         SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
         ISource source = sourceManager.getCachedSource(desc);
-        source.unloadTable(tableName, project);
+        if (!desc.isStreamingTable() || needRemoveStreamInfo) {
+            source.unloadTable(tableName, project);
+        }
         return rtn;
     }
 
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
index 3147a59..9bb4412 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
@@ -32,6 +33,7 @@ import org.apache.kylin.common.persistence.Serializer;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
 /**
@@ -53,10 +55,28 @@ public class StreamingSourceConfig extends RootPersistentEntity {
     @JsonProperty("properties")
     private Map<String, String> properties = Maps.newLinkedHashMap();
 
-    public static String concatResourcePath(String name) {
-        return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + ".json";
+    @JsonProperty("project_name")
+    private String projectName;
+
+    @Deprecated
+    static String concatResourcePath(String name) {
+        return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + MetadataConstants.FILE_SURFIX;
+    }
+
+    public static String concatResourcePathWithProjName(String name, String projectName) {
+        if (Strings.isEmpty(projectName)) {
+            return concatResourcePath(name);
+        } else {
+            // like table desc
+            return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + "--" + projectName + MetadataConstants.FILE_SURFIX;
+        }
+    }
+
+    public String getResourcePathWithProjName() {
+        return concatResourcePathWithProjName(name, projectName);
     }
 
+    @Deprecated
     public String getResourcePath() {
         return concatResourcePath(name);
     }
@@ -85,6 +105,14 @@ public class StreamingSourceConfig extends RootPersistentEntity {
         this.parserInfo = parserInfo;
     }
 
+    public void setProjectName(String projectName) {
+        this.projectName = projectName;
+    }
+
+    public String getProjectName() {
+        return projectName;
+    }
+
     @Override
     public StreamingSourceConfig clone() {
         try {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
index 7a0d62b..e619003 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java
@@ -24,6 +24,7 @@ import java.util.Locale;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.directory.api.util.Strings;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -83,7 +84,6 @@ public class StreamingSourceConfigManager {
         ResourceStore store = getStore();
         logger.info("Load all streaming metadata from folder "
                 + store.getReadableResourcePath(ResourceStore.STREAMING_V2_RESOURCE_ROOT));
-
         List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_V2_RESOURCE_ROOT,
                 MetadataConstants.FILE_SURFIX);
         for (String path : paths) {
@@ -94,7 +94,10 @@ public class StreamingSourceConfigManager {
                 logger.error("Error loading streaming desc " + path, e);
                 continue;
             }
-            if (path.equals(streamingSourceConfig.getResourcePath()) == false) {
+            // check path without project name
+            // check path with project name
+            if (path.equals(streamingSourceConfig.getResourcePath()) == false &&
+                    path.equals(streamingSourceConfig.getResourcePathWithProjName()) == false) {
                 logger.error("Skip suspicious desc at " + path + ", " + streamingSourceConfig + " should be at "
                         + streamingSourceConfig.getResourcePath());
                 continue;
@@ -113,26 +116,62 @@ public class StreamingSourceConfigManager {
      * @param name
      * @throws IOException
      */
-    public StreamingSourceConfig reloadStreamingConfigLocal(String name) throws IOException {
-
-        // Save Source
-        String path = StreamingSourceConfig.concatResourcePath(name);
-
+    public StreamingSourceConfig reloadStreamingConfigLocal(String name, String projectName) throws IOException {
+        if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
+            throw new StreamingException(String.format(Locale.ROOT,
+                    "the table name %s or project name %s is null", name, projectName));
+        }
+        // path with project name
+        String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
         // Reload the StreamingSourceConfig
-        StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
-        return ndesc;
+        StreamingSourceConfig config = loadStreamingConfigAt(path);
+        if (config == null) {
+            // the path with project name doesn't contain the source config, and check the old path without project name.
+            path = StreamingSourceConfig.concatResourcePath(name);
+            config = loadStreamingConfigAt(path);
+            if (config != null) {
+                config.setProjectName(projectName);
+                // remove from the old path, and save the source config to the new path
+                removeStreamingConfig(config);
+                saveStreamingConfig(config);
+            }
+        }
+        return config;
     }
 
     // remove streamingSourceConfig
     public void removeStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
-        String path = streamingSourceConfig.getResourcePath();
-        getStore().deleteResource(path);
+        // path with project name
+        String path = streamingSourceConfig.getResourcePathWithProjName();
+        if (loadStreamingConfigAt(path) != null) {
+            getStore().deleteResource(path);
+        } else {
+            // The source is stored in the old path which is prefix + table name + suffix
+            path = streamingSourceConfig.getResourcePath();
+            getStore().deleteResource(path);
+        }
+    }
+
+    public StreamingSourceConfig getConfig(String name, String projectName) {
+        name = name.toUpperCase(Locale.ROOT);
+        try {
+            return reloadStreamingConfigLocal(name, projectName);
+        } catch (IOException e) {
+            throw new StreamingException(e);
+        }
     }
 
-    public StreamingSourceConfig getConfig(String name) {
+    public StreamingSourceConfig getConfigMustWithProject(String name, String projectName) {
         name = name.toUpperCase(Locale.ROOT);
+        if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) {
+            throw new StreamingException(String.format(Locale.ROOT,
+                    "the table name %s or project name %s is null", name, projectName));
+        }
+        // path with project name
+        String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName);
+        // Reload the StreamingSourceConfig
         try {
-            return reloadStreamingConfigLocal(name);
+            return loadStreamingConfigAt(path);
         } catch (IOException e) {
             throw new StreamingException(e);
         }
@@ -140,32 +179,35 @@ public class StreamingSourceConfigManager {
 
     /**
      *
-     * @param desc
+     * @param streamingSourceConfig
      * @return
      * @throws IOException
      */
-    public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig desc) throws IOException {
+    public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
         // Validate CubeDesc
-        if (desc.getUuid() == null || desc.getName() == null) {
+        if (streamingSourceConfig.getUuid() == null || streamingSourceConfig.getName() == null) {
             throw new IllegalArgumentException("SteamingConfig Illegal.");
         }
 
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, System.currentTimeMillis(), STREAMING_SERIALIZER);
+        // remove Source
+        removeStreamingConfig(streamingSourceConfig);
+
+        // Save Source, the path with project name
+        String path = streamingSourceConfig.getResourcePathWithProjName();
+        getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(), STREAMING_SERIALIZER);
 
         // Reload the StreamingSourceConfig
-        StreamingSourceConfig ndesc = loadStreamingConfigAt(path);
+        StreamingSourceConfig newStreamingSourceConfig = loadStreamingConfigAt(path);
 
-        return ndesc;
+        return newStreamingSourceConfig;
     }
 
     public StreamingSourceConfig saveStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException {
         if (streamingSourceConfig == null || StringUtils.isEmpty(streamingSourceConfig.getName())) {
             throw new IllegalArgumentException();
         }
-
-        String path = StreamingSourceConfig.concatResourcePath(streamingSourceConfig.getName());
+        // path = prefix + /table name---project name + suffix
+        String path = streamingSourceConfig.getResourcePathWithProjName();
         getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(),
                 StreamingSourceConfig.SERIALIZER);
         return streamingSourceConfig;
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 7ace4ec..d09d428 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -703,7 +703,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
         CubeDescManager.getInstance(kylinConfig).reloadCubeDescLocal(cubeName);
         CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).reloadCubeQuietly(cubeName);
         StreamingSourceConfigManager.getInstance(kylinConfig).reloadStreamingConfigLocal(
-                cubeInstance.getRootFactTable());
+                cubeInstance.getRootFactTable(), cubeInstance.getProject());
     }
 
     private String calLocalSegmentCacheDir() {
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
index 2dc2a9c..4173a5c 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java
@@ -34,7 +34,7 @@ public class KafkaBatchSourceAdaptor extends HiveSource {
     public void unloadTable(String tableName, String project) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         StreamingSourceConfigManager sourceConfigManager = StreamingSourceConfigManager.getInstance(kylinConfig);
-        StreamingSourceConfig config = sourceConfigManager.getConfig(tableName);
+        StreamingSourceConfig config = sourceConfigManager.getConfig(tableName, project);
         if (config == null) {
             return;
         }
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
index 7db1961..e2ab584 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java
@@ -76,8 +76,9 @@ public class KafkaSource implements IStreamingSource {
         KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
         CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName);
         String streamingTableName = cube.getRootFactTable();
+        String projectName = cube.getProject();
         StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
-                .getConfig(streamingTableName);
+                .getConfig(streamingTableName, projectName);
 
         String topicName = getTopicName(streamingSourceConfig.getProperties());
         Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig());
@@ -145,8 +146,9 @@ public class KafkaSource implements IStreamingSource {
             CubeInstance cubeInstance = CubeManager.getInstance(kylinConf).getCube(cubeName);
             IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance);
             String streamingName = cubeInstance.getRootFactTable();
+            String projectName = cubeInstance.getProject();
             StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
-                    .getConfig(streamingName);
+                    .getConfig(streamingName, projectName);
             String topic = getTopicName(streamingSourceConfig.getProperties());
             Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cubeInstance.getConfig());
 
@@ -295,7 +297,7 @@ public class KafkaSource implements IStreamingSource {
         CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName);
         String streamingTableName = cube.getRootFactTable();
         StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf)
-                .getConfig(streamingTableName);
+                .getConfig(streamingTableName, cube.getProject());
         String topicName = getTopicName(streamingSourceConfig.getProperties());
         ISourcePosition sourcePosition = new KafkaPosition();
         Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig());
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 033a669..0c3d6e9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.tool;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -71,6 +72,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -359,7 +361,18 @@ public class CubeMigrationCLI extends AbstractApplication {
 
         if (cubeDesc.isStreamingCube()) {
             // add streaming source config info for streaming cube
-            metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
+            String tableName = cubeDesc.getModel().getRootFactTableName();
+            String projectName = cubeDesc.getProject();
+            KylinConfig kylinConf = KylinConfig.getInstanceFromEnv();
+            StreamingSourceConfigManager manager = StreamingSourceConfigManager.getInstance(kylinConf);
+            StreamingSourceConfig sourceConfig = manager.getConfig(tableName, projectName);
+            if (sourceConfig != null) {
+                metaResource.add(sourceConfig.getResourcePathWithProjName());
+            } else {
+                throw new InterruptedIOException(String.format(Locale.ROOT,
+                        "The stream source config doesn't exist, the table name: %s, the project name: %s",
+                        tableName, projectName));
+            }
         }
     }
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
index 599e58d..893365a 100644
--- a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java
@@ -20,7 +20,6 @@ package org.apache.kylin.tool.extractor;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -444,18 +443,10 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
     }
 
     private void addStreamingV2Config(CubeInstance cube) {
-        Collection<StreamingSourceConfig> streamingConfigs;
-        try {
-            streamingConfigs = streamingSourceConfigManager.listAllStreaming();
-        } catch (IOException ioe) {
-            logger.error("", ioe);
-            return;
-        }
-        for (StreamingSourceConfig streamingConfig : streamingConfigs) {
-            if (streamingConfig.getName() != null
-                    && streamingConfig.getName().equalsIgnoreCase(cube.getRootFactTable())) {
-                addRequired(StreamingSourceConfig.concatResourcePath(streamingConfig.getName()));
-            }
+        StreamingSourceConfig streamingSourceConfig =
+                streamingSourceConfigManager.getConfig(cube.getRootFactTable(), cube.getProject());
+        if (streamingSourceConfig != null) {
+            addRequired(streamingSourceConfig.getResourcePathWithProjName());
         }
     }
 
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index f75a1c4..3ebabbb 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -1105,7 +1105,8 @@ KylinApp
       $scope.streamingConfig = {
         name: '',
         properties: {},
-        parser_info: {}
+        parser_info: {},
+        project_name: ''
       };
 
       $scope.tableData = {
@@ -1399,6 +1400,7 @@ KylinApp
         $scope.streamingConfig.parser_info.ts_parser = $scope.streaming.TSParser;
         $scope.streamingConfig.parser_info.ts_pattern = $scope.streaming.TSPattern;
         $scope.streamingConfig.parser_info.field_mapping = {};
+        $scope.streamingConfig.project_name = projectName;
         $scope.tableData.columns.forEach(function(col) {
           if (col.comment) {
             $scope.streamingConfig.parser_info.field_mapping[col.name] = col.comment.replace(/\|/g, '.') || ''
@@ -1530,7 +1532,8 @@ KylinApp
       if (_.values(tableConfig.streamingSourceType).indexOf($scope.tableModel.selectedSrcTable.source_type) > -1) {
         var table = $scope.tableModel.selectedSrcTable;
         var streamingName = table.database+"."+table.name;
-        StreamingServiceV2.getConfig({table:streamingName}, function (configs) {
+        var projectName = $scope.projectModel.getSelectedProject();
+        StreamingServiceV2.getConfig({table:streamingName, project: projectName}, function (configs) {
           $scope.currentStreamingConfig = configs[0];
         });
       }