You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/05/31 00:17:32 UTC
[kylin] branch master updated: add more check rules for migration
streaming cube
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 4572cf4 add more check rules for migration streaming cube
4572cf4 is described below
commit 4572cf488c20c383b8f5f6e11bf395c1fde4cb49
Author: kliu3 <li...@apache.org>
AuthorDate: Wed Apr 7 15:55:21 2021 +0800
add more check rules for migration streaming cube
---
.../apache/kylin/common/restclient/RestClient.java | 4 ++
.../apache/kylin/metadata/model/ColumnDesc.java | 4 ++
.../org/apache/kylin/metadata/model/TableDesc.java | 4 ++
.../kylin/rest/controller/MigrationController.java | 42 +++++++++++++++
.../kylin/rest/service/MigrationRuleSet.java | 60 ++++++++++++++++++++--
.../kylin/rest/service/MigrationService.java | 2 +-
.../apache/kylin/rest/service/BasicService.java | 5 ++
.../kylin/rest/service/StreamingV2Service.java | 22 ++++++++
.../rest/service/TableSchemaUpdateChecker.java | 38 +++++++++++++-
.../apache/kylin/rest/service/TableService.java | 12 ++++-
server/src/main/resources/kylinSecurity.xml | 2 +
.../stream/core/source/MessageParserInfo.java | 17 ++++++
.../org/apache/kylin/tool/CubeMigrationCLI.java | 13 ++++-
.../StreamTableCompatibilityCheckRequest.java | 49 ++++++++++++++++++
14 files changed, 266 insertions(+), 8 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index fcd8706..a054c42 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -393,6 +393,10 @@ public class RestClient {
checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility");
}
+ public void checkStreamTableCompatibility(String jsonRequest) throws IOException {
+ checkCompatibility(jsonRequest, baseUrl+"/cubes/checkStreamTableCompatibility");
+ }
+
private void checkCompatibility(String jsonRequest, String url) throws IOException {
HttpPost post = newPost(url);
try {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 455f586..7ec5bb3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.relaxng.datatype.Datatype;
/**
* Column Metadata from Source. All name should be uppercase.
@@ -158,6 +159,9 @@ public class ColumnDesc implements Serializable {
}
public DataType getType() {
+ if (type == null && datatype != null) {
+ this.type = DataType.getType(datatype);
+ }
return type;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index d99ff54..9f701dc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -456,4 +456,8 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
return false;
}
+ public boolean isLambdaTable() {
+ return sourceType == ISourceAware.ID_KAFKA_HIVE;
+ }
+
}
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
index 45f83ea..58860c5 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
@@ -35,8 +35,11 @@ import org.apache.kylin.rest.service.MigrationRuleSet;
import org.apache.kylin.rest.service.MigrationService;
import org.apache.kylin.rest.service.ModelService;
import org.apache.kylin.rest.service.QueryService;
+import org.apache.kylin.rest.service.StreamingV2Service;
import org.apache.kylin.rest.service.TableService;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.tool.migration.CompatibilityCheckRequest;
+import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -73,6 +76,9 @@ public class MigrationController extends BasicController {
@Autowired
private TableService tableService;
+ @Autowired
+ private StreamingV2Service streamingV2Service;
+
private final String targetHost = KylinConfig.getInstanceFromEnv().getMigrationTargetAddress();
private CubeInstance getCubeInstance(String cubeName) {
@@ -140,6 +146,38 @@ public class MigrationController extends BasicController {
return Strings.isNullOrEmpty(targetHost) ? this.targetHost : targetHost;
}
+ @RequestMapping(value = "/checkStreamTableCompatibility", method = { RequestMethod.POST })
+ @ResponseBody
+ public void checkStreamTableCompatibility(@RequestBody StreamTableCompatibilityCheckRequest request) {
+ TableDesc tableDesc = null;
+ try {
+ tableDesc = JsonUtil.readValue(request.getTableDesc(), TableDesc.class);
+ // check table desc
+ logger.info("Stream table compatibility check for table {}, project {}",
+ tableDesc.getName(), tableDesc.getProject());
+ tableService.checkStreamTableCompatibility(request.getProjectName(), tableDesc);
+ logger.info("Pass stream table compatibility check for table {}, project {}",
+ tableDesc.getName(), tableDesc.getProject());
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new ConflictException(e.getMessage(), e);
+ }
+
+ // check stream source config
+ StreamingSourceConfig config = null;
+ try {
+ config = JsonUtil.readValue(request.getStreamSource(), StreamingSourceConfig.class);
+ logger.info("Stream source config compatibility check for table {}, project {}",
+ tableDesc.getName(), tableDesc.getProject());
+ streamingV2Service.checkStreamingSourceCompatibility(request.getProjectName(), config);
+ logger.info("Pass stream source config compatibility check for table {}, project {}",
+ tableDesc.getName(), tableDesc.getProject());
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new ConflictException(e.getMessage(), e);
+ }
+ }
+
/**
* Check the schema compatibility for table, model desc
*/
@@ -199,4 +237,8 @@ public class MigrationController extends BasicController {
}
return result;
}
+
+ private boolean isStreamingTable(CubeInstance cube) {
+ return cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable();
+ }
}
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
index de158bd..ab67643 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java
@@ -55,7 +55,10 @@ import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.tool.migration.CompatibilityCheckRequest;
+import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest;
import org.apache.kylin.tool.query.QueryGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +89,7 @@ public class MigrationRuleSet {
public static final Rule DEFAULT_SEGMENT_RULE = new SegmentRule();
public static final Rule DEFAULT_CUBE_OVERWRITE_RULE = new CubeOverwriteRule();
public static final Rule DEFAULT_QUERY_LATENCY_RULE = new QueryLatencyRule();
+ public static final Rule DEFAULT_STREAM_TABLE_CHECK_RULE = new StreamTableCompatibilityRule();
private static List<Rule> MUSTTOPASS_RULES = Lists.newLinkedList();
@@ -114,7 +118,9 @@ public class MigrationRuleSet {
// initialize default rules
static {
register(DEFAULT_HIVE_TABLE_CONSISTENCY_RULE, DEFAULT_CUBE_STATUS_RULE, DEFAULT_PROJECT_EXIST_RULE,
- DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE, DEFAULT_COMPATIBLE_RULE);
+ DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE,
+ DEFAULT_COMPATIBLE_RULE, DEFAULT_STREAM_TABLE_CHECK_RULE);
+
register(false, DEFAULT_AUTO_MERGE_RULE, DEFAULT_EXPANSION_RULE, DEFAULT_QUERY_LATENCY_RULE);
}
@@ -369,6 +375,17 @@ public class MigrationRuleSet {
@Override
public void apply(Context ctx) throws RuleValidationException {
+ // check table type
+ CubeInstance cube = ctx.getCubeInstance();
+ boolean isStreamTable = cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable();
+ if (isStreamTable) {
+ // check lambda
+ if (!cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isLambdaTable()) {
+ // streaming table without lambda doesn't need to check hive table
+ return;
+ }
+ }
+
// de-dup
SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
for (TableRef tableRef : ctx.getCubeInstance().getModel().getAllTables()) {
@@ -400,7 +417,8 @@ public class MigrationRuleSet {
// do schema check
KylinConfig config = KylinConfig.getInstanceFromEnv();
TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(TableMetadataManager.getInstance(config),
- CubeManager.getInstance(config), DataModelManager.getInstance(config));
+ CubeManager.getInstance(config), DataModelManager.getInstance(config),
+ StreamingSourceConfigManager.getInstance(config));
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
try {
TableSchemaUpdateChecker.CheckResult result = checker.allowReload(pair.getFirst(),
@@ -417,6 +435,42 @@ public class MigrationRuleSet {
}
+ private static class StreamTableCompatibilityRule implements Rule {
+
+ @Override
+ public void apply(Context ctx) throws RuleValidationException {
+ try {
+ checkStreamTableSchema(ctx);
+ } catch (IOException e) {
+ throw new RuleValidationException(e.getMessage(), e);
+ }
+ }
+
+ public void checkStreamTableSchema(Context ctx) throws IOException {
+ // check stream kylin table
+ TableDesc tableDesc = ctx.getCubeInstance().getModel().getRootFactTable().getTableDesc();
+ if (!tableDesc.isStreamingTable()) {
+ return;
+ }
+ logger.info("check the stream table schema, cubename {}, project {}, lambda {}",
+ ctx.cubeInstance.getName(), ctx.cubeInstance.getProject(), tableDesc.isLambdaTable());
+
+ // get stream source config
+ StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager
+ .getInstance(ctx.cubeInstance.getConfig())
+ .getConfig(tableDesc.getIdentity(), tableDesc.getProject());
+
+ StreamTableCompatibilityCheckRequest streamRequest = new StreamTableCompatibilityCheckRequest();
+ streamRequest.setProjectName(ctx.getTgtProjectName());
+ streamRequest.setTableDesc(JsonUtil.writeValueAsIndentString(tableDesc));
+ streamRequest.setStreamSource(JsonUtil.writeValueAsIndentString(streamingSourceConfig));
+
+ String jsonRequest = JsonUtil.writeValueAsIndentString(streamRequest);
+ RestClient client = new RestClient(ctx.getTargetAddress());
+ client.checkStreamTableCompatibility(jsonRequest);
+ }
+ }
+
public static class Context {
private final QueryService queryService;
private final CubeInstance cubeInstance;
@@ -466,4 +520,4 @@ public class MigrationRuleSet {
return srcProjectName;
}
}
-}
\ No newline at end of file
+}
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
index 67fce7e..31c4ed4 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java
@@ -101,7 +101,7 @@ public class MigrationService extends BasicService {
String projectName = ctx.getTgtProjectName();
try {
sendApprovedMailQuietly(cubeName, projectName);
-
+ logger.info("migration approved, cube {}, project {}", cubeName, projectName);
// do cube migration
new CubeMigrationCLI().moveCube(localHost, ctx.getTargetAddress(), cubeName, projectName, "true", "false",
"true", "true", "false");
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 9ac2602..b5a805b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -34,6 +34,7 @@ import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.metrics.MetricsManager;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hybrid.HybridManager;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
public abstract class BasicService {
@@ -63,6 +64,10 @@ public abstract class BasicService {
return StreamingManager.getInstance(getConfig());
}
+ public StreamingSourceConfigManager getStreamingSourceConfigManager() {
+ return StreamingSourceConfigManager.getInstance(getConfig());
+ }
+
public KafkaConfigManager getKafkaManager() throws IOException {
return KafkaConfigManager.getInstance(getConfig());
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
index 7cb85c6..e66eee9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java
@@ -23,7 +23,9 @@ import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -105,6 +107,26 @@ public class StreamingV2Service extends BasicService {
receiverAdminClient = adminClient;
}
+ public void checkStreamingSourceCompatibility(final String prj, final StreamingSourceConfig streamingSourceConfig) throws Exception {
+ StreamingSourceConfig existing = getStreamingManagerV2().
+ getConfig(streamingSourceConfig.getName(), streamingSourceConfig.getProjectName());
+ if (existing == null) {
+ return;
+ } else if (!Objects.equals(streamingSourceConfig.getParserInfo(), existing.getParserInfo())) {
+ logger.info("stream source parse info compatibility check, source {}, target {}",
+ streamingSourceConfig.getParserInfo(), existing.getParserInfo());
+ throw new Exception(String.format(Locale.ROOT,
+ "the stream source parse info is not compatible, name %s, project %s",
+ streamingSourceConfig.getName(), streamingSourceConfig.getProjectName()));
+ } else if (!Objects.equals(streamingSourceConfig.getProperties(), existing.getProperties())) {
+ logger.info("stream source properties compatibility check, source {}, target {}",
+ streamingSourceConfig.getProperties(), existing.getProperties());
+ throw new Exception(String.format(Locale.ROOT,
+ "the stream source properties are not compatible, name %s, project %s",
+ streamingSourceConfig.getName(), streamingSourceConfig.getProjectName()));
+ }
+ }
+
public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException {
List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
index 3a45f49..a1318c2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
@@ -45,11 +45,13 @@ import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
import org.apache.kylin.shaded.com.google.common.collect.Iterables;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
+import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
public class TableSchemaUpdateChecker {
private final TableMetadataManager metadataManager;
private final CubeManager cubeManager;
private final DataModelManager dataModelManager;
+ private StreamingSourceConfigManager streamingSourceConfigManager;
public static class CheckResult {
private final boolean valid;
@@ -75,6 +77,11 @@ public class TableSchemaUpdateChecker {
format(Locale.ROOT, "Table '%s' is compatible with all existing cubes", tableName));
}
+ static CheckResult validOnStreamTableDescCompatible(String tablename) {
+ return new CheckResult(true,
+ format(Locale.ROOT, "Stream table '%s' is compatible with existing stream table", tablename));
+ }
+
static CheckResult invalidOnFetchSchema(String tableName, Exception e) {
return new CheckResult(false,
format(Locale.ROOT, "Failed to fetch metadata of '%s': %s", tableName, e.getMessage()));
@@ -91,12 +98,22 @@ public class TableSchemaUpdateChecker {
"Found %d issue(s) with '%s':%n%s Please disable and " + "purge related " + "cube(s) first",
reasons.size(), tableName, buf.toString()));
}
+
+ static CheckResult invalidOnStreamTableCompatible(String tablename, String reason) {
+ return new CheckResult(
+ false,
+ format(Locale.ROOT, "Stream table is incompatible, the reason is %s", reason));
+ }
}
- TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) {
+ TableSchemaUpdateChecker(
+ TableMetadataManager metadataManager, CubeManager cubeManager,
+ DataModelManager dataModelManager, StreamingSourceConfigManager streamingSourceConfigManager) {
this.metadataManager = checkNotNull(metadataManager, "metadataManager is null");
this.cubeManager = checkNotNull(cubeManager, "cubeManager is null");
this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null");
+ this.streamingSourceConfigManager =
+ checkNotNull(streamingSourceConfigManager, "streamingSourceConfigManager is null");
}
private List<CubeInstance> findCubeByTable(final TableDesc table) {
@@ -186,6 +203,25 @@ public class TableSchemaUpdateChecker {
return true;
}
+ public CheckResult streamTableCheckCompatibility(TableDesc newTableDesc, String prj) {
+ final String fullTableName = newTableDesc.getIdentity();
+ TableDesc existing = metadataManager.getTableDesc(fullTableName, prj);
+ // check the table desc
+ // 1. check source type
+ // 2. check all columns
+ if (existing == null) {
+ return CheckResult.validOnFirstLoad(fullTableName);
+ } else if (newTableDesc.getSourceType() != existing.getSourceType()) {
+ String reason = format(Locale.ROOT, "the source type is %s, the target type is %s",
+ newTableDesc.getSourceType(), existing.getSourceType());
+ return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason);
+ } else if (!checkAllColumnsInTableDesc(newTableDesc, existing)) {
+ String reason = "the columns are incompatible";
+ return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason);
+ }
+ return CheckResult.validOnStreamTableDescCompatible(fullTableName);
+ }
+
public CheckResult allowReload(TableDesc newTableDesc, String prj) {
final String fullTableName = newTableDesc.getIdentity();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 2d76acc..9d20766 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -124,7 +124,12 @@ public class TableService extends BasicService {
private AclEvaluate aclEvaluate;
public TableSchemaUpdateChecker getSchemaUpdateChecker() {
- return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager());
+ return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager(), getStreamingSourceConfigManager());
+ }
+
+ public void checkStreamTableCompatibility(String project, TableDesc tableDesc) {
+ TableSchemaUpdateChecker.CheckResult result = getSchemaUpdateChecker().streamTableCheckCompatibility(tableDesc, project);
+ result.raiseExceptionWhenInvalid();
}
public void checkTableCompatibility(String prj, TableDesc tableDesc) {
@@ -201,7 +206,10 @@ public class TableService extends BasicService {
// do schema check
TableMetadataManager metaMgr = getTableManager();
CubeManager cubeMgr = getCubeManager();
- TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager());
+ TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr,
+ cubeMgr,
+ getDataModelManager(),
+ getStreamingSourceConfigManager());
for (Pair<TableDesc, TableExtDesc> pair : allMeta) {
TableDesc tableDesc = pair.getFirst();
TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 36376de..2fc1b83 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -238,6 +238,7 @@
<scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
<scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/>
<scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
+ <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/>
<scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
@@ -291,6 +292,7 @@
<scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/>
<scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/>
<scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
+ <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/>
<scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/>
<scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
index 4070ae6..fa21ed1 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java
@@ -19,6 +19,7 @@
package org.apache.kylin.stream.core.source;
import java.util.Map;
+import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -80,4 +81,20 @@ public class MessageParserInfo {
this.columnToSourceFieldMapping = columnToSourceFieldMapping;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MessageParserInfo that = (MessageParserInfo) o;
+ return formatTs == that.formatTs &&
+ Objects.equals(tsColName, that.tsColName) &&
+ Objects.equals(tsParser, that.tsParser) &&
+ Objects.equals(tsPattern, that.tsPattern) &&
+ Objects.equals(columnToSourceFieldMapping, that.columnToSourceFieldMapping);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tsColName, tsParser, tsPattern, formatTs, columnToSourceFieldMapping);
+ }
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 0c3d6e9..0295bea 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -172,7 +172,14 @@ public class CubeMigrationCLI extends AbstractApplication {
CubeManager cubeManager = CubeManager.getInstance(srcConfig);
CubeInstance cube = cubeManager.getCube(cubeName);
- logger.info("cube to be moved is : " + cubeName);
+ logger.info("cube to be moved is {}, project is {}", cubeName, projectName);
+ if (cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable()) {
+ logger.info("move streaming cube, project: {}, cube name {}", projectName, cubeName);
+ if (migrateSegment) {
+ throw new InterruptedException("Can't migrate stream cube with data");
+ }
+ }
+ logger.info("cube to be moved is {}, project is {}, the real execute is {}", cubeName, projectName, realExecute);
if (migrateSegment) {
checkCubeState(cube);
@@ -205,6 +212,7 @@ public class CubeMigrationCLI extends AbstractApplication {
updateMeta(dstConfig, projectName, cubeName, cube.getModel());
updateMeta(srcConfig, cube.getProject(), cubeName, cube.getModel());
} else {
+ logger.info("show operations for cube {}, project {}", cubeName, cube.getProject());
showOpts();
}
}
@@ -359,6 +367,8 @@ public class CubeMigrationCLI extends AbstractApplication {
metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
}
+ // if the cube is a stream cube, and add the stream source config
+ // streaming cube just support one fact table
if (cubeDesc.isStreamingCube()) {
// add streaming source config info for streaming cube
String tableName = cubeDesc.getModel().getRootFactTableName();
@@ -694,6 +704,7 @@ public class CubeMigrationCLI extends AbstractApplication {
return srcItem;
}
+
private void updateMeta(KylinConfig config, String projectName, String cubeName, DataModelDesc model) {
String[] nodes = config.getRawRestServers();
Map<String, String> tableToProjects = new HashMap<>();
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java
new file mode 100644
index 0000000..0262bb7
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+public class StreamTableCompatibilityCheckRequest {
+ private String projectName;
+ private String tableDesc;
+ private String streamSource;
+
+ public void setProjectName(String projectName) {
+ this.projectName = projectName;
+ }
+
+ public void setTableDesc(String tableDesc) {
+ this.tableDesc = tableDesc;
+ }
+
+ public void setStreamSource(String streamSource) {
+ this.streamSource = streamSource;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public String getTableDesc() {
+ return tableDesc;
+ }
+
+ public String getStreamSource() {
+ return streamSource;
+ }
+}