You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/05/31 00:17:32 UTC

[kylin] branch master updated: add more check rules for migration streaming cube

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4572cf4  add more check rules for migration streaming cube
4572cf4 is described below

commit 4572cf488c20c383b8f5f6e11bf395c1fde4cb49
Author: kliu3 <li...@apache.org>
AuthorDate: Wed Apr 7 15:55:21 2021 +0800

    add more check rules for migration streaming cube
---
 .../apache/kylin/common/restclient/RestClient.java |  4 ++
 .../apache/kylin/metadata/model/ColumnDesc.java    |  4 ++
 .../org/apache/kylin/metadata/model/TableDesc.java |  4 ++
 .../kylin/rest/controller/MigrationController.java | 42 +++++++++++++++
 .../kylin/rest/service/MigrationRuleSet.java       | 60 ++++++++++++++++++++--
 .../kylin/rest/service/MigrationService.java       |  2 +-
 .../apache/kylin/rest/service/BasicService.java    |  5 ++
 .../kylin/rest/service/StreamingV2Service.java     | 22 ++++++++
 .../rest/service/TableSchemaUpdateChecker.java     | 38 +++++++++++++-
 .../apache/kylin/rest/service/TableService.java    | 12 ++++-
 server/src/main/resources/kylinSecurity.xml        |  2 +
 .../stream/core/source/MessageParserInfo.java      | 17 ++++++
 .../org/apache/kylin/tool/CubeMigrationCLI.java    | 13 ++++-
 .../StreamTableCompatibilityCheckRequest.java      | 49 ++++++++++++++++++
 14 files changed, 266 insertions(+), 8 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index fcd8706..a054c42 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -393,6 +393,10 @@ public class RestClient {
         checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility");
     }
 
+    public void checkStreamTableCompatibility(String jsonRequest) throws IOException {
+        checkCompatibility(jsonRequest, baseUrl+"/cubes/checkStreamTableCompatibility");
+    }
+
     private void checkCompatibility(String jsonRequest, String url) throws IOException {
         HttpPost post = newPost(url);
         try {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 455f586..7ec5bb3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.relaxng.datatype.Datatype;
 
 /**
  * Column Metadata from Source. All name should be uppercase.
@@ -158,6 +159,9 @@ public class ColumnDesc implements Serializable {
     }
 
     public DataType getType() {
+        if (type == null && datatype != null) {
+            this.type = DataType.getType(datatype);
+        }
         return type;
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index d99ff54..9f701dc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -456,4 +456,8 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return false;
     }
 
+    public boolean isLambdaTable() {
+        return sourceType == ISourceAware.ID_KAFKA_HIVE;
+    }
+
 }
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
index 45f83ea..58860c5 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
@@ -35,8 +35,11 @@ import org.apache.kylin.rest.service.MigrationRuleSet;
 import org.apache.kylin.rest.service.MigrationService;
 import org.apache.kylin.rest.service.ModelService;
 import org.apache.kylin.rest.service.QueryService;
+import org.apache.kylin.rest.service.StreamingV2Service;
 import org.apache.kylin.rest.service.TableService;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
 import org.apache.kylin.tool.migration.CompatibilityCheckRequest;
+import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -73,6 +76,9 @@ public class MigrationController extends BasicController {
     @Autowired
     private TableService tableService;
 
+    @Autowired
+    private StreamingV2Service streamingV2Service;
+
     private final String targetHost = KylinConfig.getInstanceFromEnv().getMigrationTargetAddress();
 
     private CubeInstance getCubeInstance(String cubeName) {
@@ -140,6 +146,38 @@ public class MigrationController extends BasicController {
         return Strings.isNullOrEmpty(targetHost) ? this.targetHost : targetHost;
     }
 
+    @RequestMapping(value = "/checkStreamTableCompatibility", method = { RequestMethod.POST })
+    @ResponseBody
+    public void checkStreamTableCompatibility(@RequestBody StreamTableCompatibilityCheckRequest request) {
+        TableDesc tableDesc = null;
+        try {
+            tableDesc = JsonUtil.readValue(request.getTableDesc(), TableDesc.class);
+            // check table desc
+            logger.info("Stream table compatibility check for table {}, project {}",
+                    tableDesc.getName(), tableDesc.getProject());
+            tableService.checkStreamTableCompatibility(request.getProjectName(), tableDesc);
+            logger.info("Pass stream table compatibility check for table {}, project {}",
+                    tableDesc.getName(), tableDesc.getProject());
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new ConflictException(e.getMessage(), e);
+        }
+
+        // check stream source config
+        StreamingSourceConfig config = null;
+        try {
+            config = JsonUtil.readValue(request.getStreamSource(), StreamingSourceConfig.class);
+            logger.info("Stream source config compatibility check for table {}, project {}",
+                    tableDesc.getName(), tableDesc.getProject());
+            streamingV2Service.checkStreamingSourceCompatibility(request.getProjectName(), config);
+            logger.info("Pass stream source config compatibility check for table {}, project {}",
+                    tableDesc.getName(), tableDesc.getProject());
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new ConflictException(e.getMessage(), e);
+        }
+    }
+
     /**
      * Check the schema compatibility for table, model desc
      */
@@ -199,4 +237,8 @@ public class MigrationController extends BasicController {
         }
         return result;
     }
+
+    private boolean isStreamingTable(CubeInstance cube) {
+        return cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable();
+    }
 }
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
index de158bd..ab67643 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
@@ -55,7 +55,10 @@ import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
 import org.apache.kylin.tool.migration.CompatibilityCheckRequest;
+import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest;
 import org.apache.kylin.tool.query.QueryGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +89,7 @@ public class MigrationRuleSet {
     public static final Rule DEFAULT_SEGMENT_RULE = new SegmentRule();
     public static final Rule DEFAULT_CUBE_OVERWRITE_RULE = new CubeOverwriteRule();
     public static final Rule DEFAULT_QUERY_LATENCY_RULE = new QueryLatencyRule();
+    public static final Rule DEFAULT_STREAM_TABLE_CHECK_RULE = new StreamTableCompatibilityRule();
 
     private static List<Rule> MUSTTOPASS_RULES = Lists.newLinkedList();
 
@@ -114,7 +118,9 @@ public class MigrationRuleSet {
     // initialize default rules
     static {
         register(DEFAULT_HIVE_TABLE_CONSISTENCY_RULE, DEFAULT_CUBE_STATUS_RULE, DEFAULT_PROJECT_EXIST_RULE,
-                DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE, DEFAULT_COMPATIBLE_RULE);
+                 DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE,
+                 DEFAULT_COMPATIBLE_RULE, DEFAULT_STREAM_TABLE_CHECK_RULE);
+
         register(false, DEFAULT_AUTO_MERGE_RULE, DEFAULT_EXPANSION_RULE, DEFAULT_QUERY_LATENCY_RULE);
     }
 
@@ -369,6 +375,17 @@ public class MigrationRuleSet {
 
         @Override
         public void apply(Context ctx) throws RuleValidationException {
+            // check table type
+            CubeInstance cube = ctx.getCubeInstance();
+            boolean isStreamTable = cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable();
+            if (isStreamTable) {
+                // check lambda
+                if (!cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isLambdaTable()) {
+                    // streaming table without lambda doesn't need to check hive table
+                    return;
+                }
+            }
+
             // de-dup
             SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
             for (TableRef tableRef : ctx.getCubeInstance().getModel().getAllTables()) {
@@ -400,7 +417,8 @@ public class MigrationRuleSet {
             // do schema check
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(TableMetadataManager.getInstance(config),
-                    CubeManager.getInstance(config), DataModelManager.getInstance(config));
+                CubeManager.getInstance(config), DataModelManager.getInstance(config),
+                StreamingSourceConfigManager.getInstance(config));
             for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
                 try {
                     TableSchemaUpdateChecker.CheckResult result = checker.allowReload(pair.getFirst(),
@@ -417,6 +435,42 @@ public class MigrationRuleSet {
 
     }
 
+    private static class StreamTableCompatibilityRule implements Rule {
+
+        @Override
+        public void apply(Context ctx) throws RuleValidationException {
+            try {
+                checkStreamTableSchema(ctx);
+            } catch (IOException e) {
+                throw new RuleValidationException(e.getMessage(), e);
+            }
+        }
+
+        public void checkStreamTableSchema(Context ctx) throws IOException {
+            // check stream kylin table
+            TableDesc tableDesc = ctx.getCubeInstance().getModel().getRootFactTable().getTableDesc();
+            if (!tableDesc.isStreamingTable()) {
+                return;
+            }
+            logger.info("check the stream table schema, cubename {}, project {}, lambda {}",
+                    ctx.cubeInstance.getName(), ctx.cubeInstance.getProject(), tableDesc.isLambdaTable());
+
+            // get stream source config
+            StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager
+                    .getInstance(ctx.cubeInstance.getConfig())
+                    .getConfig(tableDesc.getIdentity(), tableDesc.getProject());
+
+            StreamTableCompatibilityCheckRequest streamRequest = new StreamTableCompatibilityCheckRequest();
+            streamRequest.setProjectName(ctx.getTgtProjectName());
+            streamRequest.setTableDesc(JsonUtil.writeValueAsIndentString(tableDesc));
+            streamRequest.setStreamSource(JsonUtil.writeValueAsIndentString(streamingSourceConfig));
+
+            String jsonRequest = JsonUtil.writeValueAsIndentString(streamRequest);
+            RestClient client = new RestClient(ctx.getTargetAddress());
+            client.checkStreamTableCompatibility(jsonRequest);
+            }
+        }
+
     public static class Context {
         private final QueryService queryService;
         private final CubeInstance cubeInstance;
@@ -466,4 +520,4 @@ public class MigrationRuleSet {
             return srcProjectName;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
index 67fce7e..31c4ed4 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
@@ -101,7 +101,7 @@ public class MigrationService extends BasicService {
         String projectName = ctx.getTgtProjectName();
         try {
             sendApprovedMailQuietly(cubeName, projectName);
-
+            logger.info("migration approved, cube {}, project {}", cubeName, projectName);
             // do cube migration
             new CubeMigrationCLI().moveCube(localHost, ctx.getTargetAddress(), cubeName, projectName, "true", "false",
                     "true", "true", "false");
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 9ac2602..b5a805b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -34,6 +34,7 @@ import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hybrid.HybridManager;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
 
 public abstract class BasicService {
 
@@ -63,6 +64,10 @@ public abstract class BasicService {
         return StreamingManager.getInstance(getConfig());
     }
 
+    public StreamingSourceConfigManager getStreamingSourceConfigManager() {
+        return StreamingSourceConfigManager.getInstance(getConfig());
+    }
+
     public KafkaConfigManager getKafkaManager() throws IOException {
         return KafkaConfigManager.getInstance(getConfig());
     }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 7cb85c6..e66eee9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -23,7 +23,9 @@ import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -105,6 +107,26 @@ public class StreamingV2Service extends BasicService {
         receiverAdminClient = adminClient;
     }
 
+    public void checkStreamingSourceCompatibility(final String prj, final StreamingSourceConfig streamingSourceConfig) throws Exception {
+        StreamingSourceConfig existing = getStreamingManagerV2().
+                getConfig(streamingSourceConfig.getName(), streamingSourceConfig.getProjectName());
+        if (existing == null) {
+            return;
+        } else if (!Objects.equals(streamingSourceConfig.getParserInfo(), existing.getParserInfo())) {
+            logger.info("stream source parse info compatibility check, source {}, target {}",
+                    streamingSourceConfig.getParserInfo(), existing.getParserInfo());
+            throw new Exception(String.format(Locale.ROOT,
+                    "the stream source parse info is not compatible, name %s, project %s",
+                    streamingSourceConfig.getName(), streamingSourceConfig.getProjectName()));
+        } else if (!Objects.equals(streamingSourceConfig.getProperties(), existing.getProperties())) {
+            logger.info("stream source properties compatibility check, source {}, target {}",
+                    streamingSourceConfig.getProperties(), existing.getProperties());
+            throw new Exception(String.format(Locale.ROOT,
+                    "the stream source properties are not compatible, name %s, project %s",
+                    streamingSourceConfig.getName(), streamingSourceConfig.getProjectName()));
+        }
+    }
+
     public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException {
         List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
         if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
index 3a45f49..a1318c2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
@@ -45,11 +45,13 @@ import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
 import org.apache.kylin.shaded.com.google.common.collect.Iterables;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
 
 public class TableSchemaUpdateChecker {
     private final TableMetadataManager metadataManager;
     private final CubeManager cubeManager;
     private final DataModelManager dataModelManager;
+    private StreamingSourceConfigManager streamingSourceConfigManager;
 
     public static class CheckResult {
         private final boolean valid;
@@ -75,6 +77,11 @@ public class TableSchemaUpdateChecker {
                     format(Locale.ROOT, "Table '%s' is compatible with all existing cubes", tableName));
         }
 
+        static CheckResult validOnStreamTableDescCompatible(String tablename) {
+            return new CheckResult(true,
+                format(Locale.ROOT, "Stream table '%s' is compatible with existing stream table", tablename));
+        }
+
         static CheckResult invalidOnFetchSchema(String tableName, Exception e) {
             return new CheckResult(false,
                     format(Locale.ROOT, "Failed to fetch metadata of '%s': %s", tableName, e.getMessage()));
@@ -91,12 +98,22 @@ public class TableSchemaUpdateChecker {
                             "Found %d issue(s) with '%s':%n%s Please disable and " + "purge related " + "cube(s) first",
                             reasons.size(), tableName, buf.toString()));
         }
+
+        static CheckResult invalidOnStreamTableCompatible(String tablename, String reason) {
+            return new CheckResult(
+                false,
+                format(Locale.ROOT, "Stream table is incompatible, the reason is %s", reason));
+        }
     }
 
-    TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) {
+    TableSchemaUpdateChecker(
+        TableMetadataManager metadataManager, CubeManager cubeManager,
+        DataModelManager dataModelManager, StreamingSourceConfigManager streamingSourceConfigManager) {
         this.metadataManager = checkNotNull(metadataManager, "metadataManager is null");
         this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
         this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null");
+        this.streamingSourceConfigManager =
+            checkNotNull(streamingSourceConfigManager, "streamingSourceConfigManager is null");
     }
 
     private List<CubeInstance> findCubeByTable(final TableDesc table) {
@@ -186,6 +203,25 @@ public class TableSchemaUpdateChecker {
         return true;
     }
 
+    public CheckResult streamTableCheckCompatibility(TableDesc newTableDesc, String prj) {
+        final String fullTableName = newTableDesc.getIdentity();
+        TableDesc existing = metadataManager.getTableDesc(fullTableName, prj);
+        // check the table desc
+        // 1. check source type
+        // 2. check all columns
+        if (existing == null) {
+            return CheckResult.validOnFirstLoad(fullTableName);
+        } else if (newTableDesc.getSourceType() != existing.getSourceType()) {
+            String reason = format(Locale.ROOT, "the source type is %s, the target type is %s",
+                    newTableDesc.getSourceType(), existing.getSourceType());
+            return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason);
+        } else if (!checkAllColumnsInTableDesc(newTableDesc, existing)) {
+            String reason = "the columns are incompatible";
+            return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason);
+        }
+        return CheckResult.validOnStreamTableDescCompatible(fullTableName);
+    }
+
     public CheckResult allowReload(TableDesc newTableDesc, String prj) {
         final String fullTableName = newTableDesc.getIdentity();
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 2d76acc..9d20766 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -124,7 +124,12 @@ public class TableService extends BasicService {
     private AclEvaluate aclEvaluate;
 
     public TableSchemaUpdateChecker getSchemaUpdateChecker() {
-        return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager());
+        return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager(), getStreamingSourceConfigManager());
+    }
+
+    public void checkStreamTableCompatibility(String project, TableDesc tableDesc) {
+        TableSchemaUpdateChecker.CheckResult result = getSchemaUpdateChecker().streamTableCheckCompatibility(tableDesc, project);
+        result.raiseExceptionWhenInvalid();
     }
 
     public void checkTableCompatibility(String prj, TableDesc tableDesc) {
@@ -201,7 +206,10 @@ public class TableService extends BasicService {
         // do schema check
         TableMetadataManager metaMgr = getTableManager();
         CubeManager cubeMgr = getCubeManager();
-        TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager());
+        TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr,
+                cubeMgr,
+                getDataModelManager(),
+                getStreamingSourceConfigManager());
         for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
             TableDesc tableDesc = pair.getFirst();
             TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 36376de..2fc1b83 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -238,6 +238,7 @@
             <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
+            <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/>
             <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
@@ -291,6 +292,7 @@
             <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
+            <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/>
             <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
index 4070ae6..fa21ed1 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.stream.core.source;
 
 import java.util.Map;
+import java.util.Objects;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -80,4 +81,20 @@ public class MessageParserInfo {
         this.columnToSourceFieldMapping = columnToSourceFieldMapping;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MessageParserInfo that = (MessageParserInfo) o;
+        return formatTs == that.formatTs &&
+                Objects.equals(tsColName, that.tsColName) &&
+                Objects.equals(tsParser, that.tsParser) &&
+                Objects.equals(tsPattern, that.tsPattern) &&
+                Objects.equals(columnToSourceFieldMapping, that.columnToSourceFieldMapping);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(tsColName, tsParser, tsPattern, formatTs, columnToSourceFieldMapping);
+    }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 0c3d6e9..0295bea 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -172,7 +172,14 @@ public class CubeMigrationCLI extends AbstractApplication {
 
         CubeManager cubeManager = CubeManager.getInstance(srcConfig);
         CubeInstance cube = cubeManager.getCube(cubeName);
-        logger.info("cube to be moved is : " + cubeName);
+        logger.info("cube to be moved is {}, project is  {}", cubeName, projectName);
+        if (cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable()) {
+            logger.info("move streaming cube, project: {}, cube name {}", projectName, cubeName);
+            if (migrateSegment) {
+               throw new InterruptedException("Can't migrate stream cube with data");
+            }
+        }
+        logger.info("cube to be moved is {}, project is {}, the real execute is {}", cubeName, projectName, realExecute);
 
         if (migrateSegment) {
             checkCubeState(cube);
@@ -205,6 +212,7 @@ public class CubeMigrationCLI extends AbstractApplication {
             updateMeta(dstConfig, projectName, cubeName, cube.getModel());
             updateMeta(srcConfig, cube.getProject(), cubeName, cube.getModel());
         } else {
+            logger.info("show operations for cube {}, project {}", cubeName, cube.getProject());
             showOpts();
         }
     }
@@ -359,6 +367,8 @@ public class CubeMigrationCLI extends AbstractApplication {
             metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
         }
 
+        // if the cube is a stream cube, and add the stream source config
+        // streaming cube just support one fact table
         if (cubeDesc.isStreamingCube()) {
             // add streaming source config info for streaming cube
             String tableName = cubeDesc.getModel().getRootFactTableName();
@@ -694,6 +704,7 @@ public class CubeMigrationCLI extends AbstractApplication {
         return srcItem;
     }
 
+
     private void updateMeta(KylinConfig config, String projectName, String cubeName, DataModelDesc model) {
         String[] nodes = config.getRawRestServers();
         Map<String, String> tableToProjects = new HashMap<>();
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java
new file mode 100644
index 0000000..0262bb7
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+public class StreamTableCompatibilityCheckRequest {
+    private String projectName;
+    private String tableDesc;
+    private String streamSource;
+
+    public void setProjectName(String projectName) {
+        this.projectName = projectName;
+    }
+
+    public void setTableDesc(String tableDesc) {
+        this.tableDesc = tableDesc;
+    }
+
+    public void setStreamSource(String streamSource) {
+        this.streamSource = streamSource;
+    }
+
+    public String getProjectName() {
+        return projectName;
+    }
+
+    public String getTableDesc() {
+        return tableDesc;
+    }
+
+    public String getStreamSource() {
+        return streamSource;
+    }
+}