You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/24 12:57:19 UTC
[kylin] 01/04: KYLIN-3515 Add uuid for materialized table of hive
view
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.5.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c40188f40de14f7800dab58632e30b3ee42b6f39
Author: nichunen <ch...@kyligence.io>
AuthorDate: Mon Sep 3 15:34:10 2018 +0800
KYLIN-3515 Add uuid for materialized table of hive view
---
.../src/main/java/org/apache/kylin/cube/CubeManager.java | 8 ++++----
.../apache/kylin/cube/cli/DictionaryGeneratorCLI.java | 16 +++++++++-------
.../java/org/apache/kylin/dict/lookup/SnapshotCLI.java | 5 ++++-
.../apache/kylin/job/execution/AbstractExecutable.java | 8 ++++++++
.../kylin/job/execution/DefaultChainedExecutable.java | 4 +++-
.../apache/kylin/job/execution/ExecutableManager.java | 4 ++++
.../java/org/apache/kylin/metadata/model/TableDesc.java | 8 ++++++++
.../src/main/java/org/apache/kylin/source/ISource.java | 2 +-
.../main/java/org/apache/kylin/source/SourceManager.java | 4 ++--
.../main/java/org/apache/kylin/engine/mr/IMRInput.java | 2 +-
.../org/apache/kylin/engine/mr/JobBuilderSupport.java | 1 +
.../src/main/java/org/apache/kylin/engine/mr/MRUtil.java | 8 ++++----
.../kylin/engine/mr/steps/CreateDictionaryJob.java | 4 +++-
.../mr/steps/lookup/LookupSnapshotToMetaStoreStep.java | 2 +-
.../apache/kylin/source/hive/ITSnapshotManagerTest.java | 2 +-
.../main/java/org/apache/kylin/rest/msg/CnMessage.java | 4 ++++
.../src/main/java/org/apache/kylin/rest/msg/Message.java | 4 ++++
.../java/org/apache/kylin/rest/service/CubeService.java | 9 ++++++++-
.../java/org/apache/kylin/rest/service/JobService.java | 7 +++++++
.../java/org/apache/kylin/rest/service/TableService.java | 2 +-
.../java/org/apache/kylin/source/hive/HiveInputBase.java | 10 +++++-----
.../java/org/apache/kylin/source/hive/HiveMRInput.java | 13 ++++++++-----
.../java/org/apache/kylin/source/hive/HiveSource.java | 4 ++--
.../org/apache/kylin/source/hive/HiveSparkInput.java | 6 ++++--
.../source/hive/cardinality/ColumnCardinalityMapper.java | 2 +-
.../hive/cardinality/HiveColumnCardinalityJob.java | 6 ++++--
.../java/org/apache/kylin/source/jdbc/JdbcSource.java | 2 +-
.../java/org/apache/kylin/source/kafka/KafkaMRInput.java | 2 +-
.../java/org/apache/kylin/source/kafka/KafkaSource.java | 2 +-
.../kylin/storage/hbase/lookup/HBaseLookupMRSteps.java | 2 +-
.../storage/hbase/lookup/LookupTableToHFileJob.java | 5 +++--
.../storage/hbase/lookup/LookupTableToHFileMapper.java | 4 +++-
32 files changed, 112 insertions(+), 50 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 2a56941..0dc825f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -1028,8 +1028,8 @@ public class CubeManager implements IRealizationProvider {
return dictAssist.getDictionary(cubeSeg, col);
}
- public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
- return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
+ return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
}
private TableMetadataManager getMetadataManager() {
@@ -1103,7 +1103,7 @@ public class CubeManager implements IRealizationProvider {
return (Dictionary<String>) info.getDictionaryObject();
}
- public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
+ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
// work on copy instead of cached objects
CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
@@ -1112,7 +1112,7 @@ public class CubeManager implements IRealizationProvider {
SnapshotManager snapshotMgr = getSnapshotManager();
TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
- IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 7fcf320..6de42ac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -42,26 +42,28 @@ public class DictionaryGeneratorCLI {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
- public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+ public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
+ DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
- processSegment(config, segment, factTableValueProvider, dictProvider);
+ processSegment(config, segment, uuid, factTableValueProvider, dictProvider);
}
- private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid,
+ DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(config);
// dictionary
for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
logger.info("Building dictionary for " + col);
IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
-
+
Dictionary<String> preBuiltDict = null;
if (dictProvider != null) {
preBuiltDict = dictProvider.getDictionary(col);
}
-
+
if (preBuiltDict != null) {
logger.debug("Dict for '" + col.getName() + "' has already been built, save it");
cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
@@ -87,9 +89,9 @@ public class DictionaryGeneratorCLI {
for (String tableIdentity : toSnapshot) {
logger.info("Building snapshot of " + tableIdentity);
- cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity);
+ cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
}
-
+
CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
for (TableRef lookup : toCheckLookup) {
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index f965d18..e30d156 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -42,7 +42,10 @@ public class SnapshotCLI {
if (tableDesc == null)
throw new IllegalArgumentException("Not table found by " + table);
- SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+ if (tableDesc.isView())
+ throw new IllegalArgumentException("Build snapshot of hive view \'" + table + "\' not supported.");
+
+ SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc, null), tableDesc, overwriteUUID);
System.out.println("resource path updated: " + snapshot.getResourcePath());
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index dbbfc39..ad22abc 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -62,6 +62,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
private KylinConfig config;
private String name;
private String id;
+ private AbstractExecutable parentExecutable = null;
private Map<String, String> params = Maps.newHashMap();
public AbstractExecutable() {
@@ -396,6 +397,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
}
}
+ public AbstractExecutable getParentExecutable() {
+ return parentExecutable;
+ }
+ public void setParentExecutable(AbstractExecutable parentExecutable) {
+ this.parentExecutable = parentExecutable;
+ }
+
public static long getExtraInfoAsLong(Output output, String key, long defaultValue) {
final String str = output.getExtra().get(key);
if (str != null) {
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2297be7..a8a91fd 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
@@ -169,7 +170,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
@Override
public void addTask(AbstractExecutable executable) {
- executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
+ executable.setParentExecutable(this);
+ executable.setId(getId() + "-" + String.format(Locale.ROOT, "%02d", subTasks.size()));
this.subTasks.add(executable);
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index d37b3da..788a7fb 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -530,6 +530,10 @@ public class ExecutableManager {
if (tasks != null && !tasks.isEmpty()) {
Preconditions.checkArgument(result instanceof ChainedExecutable);
for (ExecutablePO subTask : tasks) {
+ AbstractExecutable subTaskExecutable = parseTo(subTask);
+ if (subTaskExecutable != null) {
+ subTaskExecutable.setParentExecutable(result);
+ }
((ChainedExecutable) result).addTask(parseTo(subTask));
}
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index f69b05d..3f9a774 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -359,6 +359,14 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name;
}
+ public String getMaterializedName(String uuid) {
+ if (uuid == null) {
+ return getMaterializedName();
+ } else
+ return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name + "_"
+ + uuid.replaceAll("-", "_");
+ }
+
@Override
public String toString() {
return "TableDesc{" + "name='" + name + '\'' + ", columns=" + Arrays.toString(columns) + ", sourceType="
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index f79d0f0..43df3f1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -43,7 +43,7 @@ public interface ISource extends Closeable {
/**
* Return a ReadableTable that can iterate through the rows of given table.
*/
- IReadableTable createReadableTable(TableDesc tableDesc);
+ IReadableTable createReadableTable(TableDesc tableDesc, String uuid);
/**
* Give the source a chance to enrich a SourcePartition before build start.
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
index 62c4368..03559bc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java
@@ -140,8 +140,8 @@ public class SourceManager {
});
}
- public static IReadableTable createReadableTable(TableDesc table) {
- return getSource(table).createReadableTable(table);
+ public static IReadableTable createReadableTable(TableDesc table, String uuid) {
+ return getSource(table).createReadableTable(table, uuid);
}
public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index f650321..c259c4e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -36,7 +36,7 @@ public interface IMRInput {
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
/** Return an InputFormat that reads from specified table. */
- public IMRTableInputFormat getTableInputFormat(TableDesc table);
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid);
/** Return a helper to participate in batch cubing merge job flow. */
public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 5b1f38c..5f27bf8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -170,6 +170,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
buildDictionaryStep.setJobParams(cmd.toString());
buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 85a425c..3a0fb84 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -42,13 +42,13 @@ public class MRUtil {
return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
}
- public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) {
+ public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) {
TableDesc t = getTableDesc(tableName, prj);
- return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t);
+ return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t, uuid);
}
- public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
- return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
+ public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc, String uuid) {
+ return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc, uuid);
}
private static TableDesc getTableDesc(String tableName, String prj) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index e01da9e..aeb7b12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -59,16 +59,18 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_DICT_PATH);
+ options.addOption(OPTION_CUBING_JOB_ID);
parseOptions(options, args);
final String cubeName = getOptionValue(OPTION_CUBE_NAME);
final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ final String jobId = getOptionValue(OPTION_CUBING_JOB_ID);
final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
final String dictPath = getOptionValue(OPTION_DICT_PATH);
final KylinConfig config = KylinConfig.getInstanceFromEnv();
- DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() {
+ DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, jobId, new DistinctColumnValuesProvider() {
@Override
public IReadableTable getDistinctValuesFor(TblColRef col) {
return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
index c64694c..753b67c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -61,7 +61,7 @@ public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
CubeDesc cubeDesc = cube.getDescriptor();
try {
TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject());
- IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
logger.info("take snapshot for table:" + lookupTableName);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cube.getConfig());
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
index 872f570..efdf54b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
public void basicTest() throws Exception {
String tableName = "EDW.TEST_SITES";
TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default");
- IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc, getTestConfig()).getResourcePath();
snapshotMgr.wipeoutCache();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java
index 2b1bf8e..dcfa19d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java
@@ -165,6 +165,10 @@ public class CnMessage extends Message {
return "Cube 不能被重命名";
}
+ public String getREBUILD_SNAPSHOT_OF_VIEW() {
+ return "不支持重新构建 Hive view '%s' 的 snapshot, 请刷新 Cube 的 segment";
+ }
+
// Model
public String getINVALID_MODEL_DEFINITION() {
return "非法模型定义";
diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
index 5f7e296..7c0dbe3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
@@ -165,6 +165,10 @@ public class Message {
return "Cube renaming is not allowed.";
}
+ public String getREBUILD_SNAPSHOT_OF_VIEW() {
+ return "Rebuild snapshot of hive view '%s' is not supported, please refresh segment of the cube";
+ }
+
// Model
public String getINVALID_MODEL_DEFINITION() {
return "The data model definition is invalid.";
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 5e2c49e..c5178ab 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -57,6 +57,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
@@ -490,8 +491,14 @@ public class CubeService extends BasicService implements InitializingBean {
public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
throws IOException {
aclEvaluate.checkProjectOperationPermission(cube);
+ Message msg = MsgPicker.getMsg();
+ TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+ if (tableDesc.isView()) {
+ throw new BadRequestException(
+ String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+ }
CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
- getCubeManager().buildSnapshotTable(seg, lookupTable);
+ getCubeManager().buildSnapshotTable(seg, lookupTable, null);
return cube;
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 8f8658c..71509bc 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -66,6 +66,7 @@ import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
@@ -394,6 +395,12 @@ public class JobService extends BasicService implements InitializingBean {
public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs,
String submitter) throws IOException {
+ Message msg = MsgPicker.getMsg();
+ TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+ if (tableDesc.isView()) {
+ throw new BadRequestException(
+ String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+ }
LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build();
getExecutableManager().addJob(job);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 8748009..1bb03e4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -373,7 +373,7 @@ public class TableService extends BasicService {
public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
- IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
TableSignature signature = hiveTable.getSignature();
return internalGetLookupTableSnapshots(tableName, signature);
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 9a2c242..94c1a02 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -50,8 +50,8 @@ public class HiveInputBase {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class);
- protected static String getTableNameForHCat(TableDesc table) {
- String tableName = (table.isView()) ? table.getMaterializedName() : table.getName();
+ protected static String getTableNameForHCat(TableDesc table, String uuid) {
+ String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName();
String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
: table.getDatabase();
return String.format("%s.%s", database, tableName).toUpperCase();
@@ -93,7 +93,7 @@ public class HiveInputBase {
}
protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
- String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables) {
+ String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -118,8 +118,8 @@ public class HiveInputBase {
hiveCmdBuilder.addStatement(hiveInitStatements);
for (TableDesc lookUpTableDesc : lookupViewsTables) {
String identity = lookUpTableDesc.getIdentity();
- String intermediate = lookUpTableDesc.getMaterializedName();
if (lookUpTableDesc.isView()) {
+ String intermediate = lookUpTableDesc.getMaterializedName(uuid);
String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir);
hiveCmdBuilder.addStatement(materializeViewHql);
intermediateTables.add(intermediate);
@@ -134,7 +134,7 @@ public class HiveInputBase {
protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) {
StringBuilder createIntermediateTableHql = new StringBuilder();
createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n");
- createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
+ createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName
+ " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n");
createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n");
createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n");
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 33b1059..d6b85ed 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -58,8 +58,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
}
@Override
- public IMRTableInputFormat getTableInputFormat(TableDesc table) {
- return new HiveTableInputFormat(getTableNameForHCat(table));
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
+ return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
}
@Override
@@ -139,7 +139,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
+ cubeInstance.getDescriptor()));
}
// special for hive
@@ -158,7 +159,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+ flatDesc, hiveViewIntermediateTables, jobFlow.getId());
if (task != null) {
jobFlow.addTask(task);
}
@@ -194,7 +196,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
* @deprecated For backwards compatibility.
*/
@Deprecated
- public static class RedistributeFlatHiveTableStep extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep {
+ public static class RedistributeFlatHiveTableStep
+ extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep {
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index 938114c..b536bf0 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -54,12 +54,12 @@ public class HiveSource implements ISource {
}
@Override
- public IReadableTable createReadableTable(TableDesc tableDesc) {
+ public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) {
// hive view must have been materialized already
// ref HiveMRInput.createLookupHiveViewMaterializationStep()
if (tableDesc.isView()) {
KylinConfig config = KylinConfig.getInstanceFromEnv();
- String tableName = tableDesc.getMaterializedName();
+ String tableName = tableDesc.getMaterializedName(uuid);
tableDesc = new TableDesc();
tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index 881be1a..d710db7 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -85,7 +85,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
- jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
+ jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc,
+ cubeInstance.getDescriptor()));
}
// special for hive
@@ -96,7 +97,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables);
+ AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+ flatDesc, hiveViewIntermediateTables, jobFlow.getId());
if (task != null) {
jobFlow.addTask(task);
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index da44ea5..18d14b8 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -64,7 +64,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
String project = conf.get(BatchConstants.CFG_PROJECT_NAME);
String tableName = conf.get(BatchConstants.CFG_TABLE_NAME);
tableDesc = TableMetadataManager.getInstance(config).getTableDesc(tableName, project);
- tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+ tableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID));
}
@Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
index dd32a58..f51fce0 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java
@@ -50,7 +50,8 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
@SuppressWarnings("static-access")
- protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
+ protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true)
+ .withDescription("The hive table name").create("table");
public HiveColumnCardinalityJob() {
}
@@ -90,7 +91,8 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false");
// Mapper
- IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project);
+ IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project,
+ getOptionValue(OPTION_CUBING_JOB_ID));
tableInputFormat.configureJob(job);
job.setMapperClass(ColumnCardinalityMapper.class);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 37d119e..20e882a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -51,7 +51,7 @@ public class JdbcSource implements ISource {
}
@Override
- public IReadableTable createReadableTable(TableDesc tableDesc) {
+ public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) {
return new JdbcTable(tableDesc);
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 2c95c1c..73b224e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -61,7 +61,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
}
@Override
- public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+ public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) {
return new KafkaTableInputFormat(cubeSegment, null);
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 264f2ce..0d9c845 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -71,7 +71,7 @@ public class KafkaSource implements ISource {
}
@Override
- public IReadableTable createReadableTable(TableDesc tableDesc) {
+ public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) {
throw new UnsupportedOperationException();
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
index fb5bab5..1d9181b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -82,7 +82,7 @@ public class HBaseLookupMRSteps {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject());
- IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, context.getJobFlow().getId());
try {
ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot(
sourceTable.getSignature(), tableName);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
index 054e146..199a1fe 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -90,6 +90,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob {
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
String tableName = getOptionValue(OPTION_TABLE_NAME);
String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+ String jobId = getOptionValue(OPTION_CUBING_JOB_ID);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
@@ -101,7 +102,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob {
ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID);
- IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+ IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, jobId);
logger.info("create HTable for source table snapshot:{}", tableName);
Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig);
@@ -118,7 +119,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob {
FileOutputFormat.setOutputPath(job, output);
- IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+ IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc, jobId);
tableInputFormat.configureJob(job);
job.setMapperClass(LookupTableToHFileMapper.class);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
index 4be9533..0ad63e9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.KylinConfig;
@@ -79,7 +80,8 @@ public class LookupTableToHFileMapper<KEYIN> extends KylinMapper<KEYIN, Object,
keyColumns[i] = keyColRefs[i].getName();
}
encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum);
- lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+ Configuration conf = context.getConfiguration();
+ lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID));
}
@Override