You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/08/10 04:13:45 UTC
[4/6] kylin git commit: KYLIN-1726 Scalable streaming cubing
KYLIN-1726 Scalable streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23407e3d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23407e3d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23407e3d
Branch: refs/heads/master
Commit: 23407e3ddeff9011151d871f1f4e51c0a987564c
Parents: acde339
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jul 6 11:33:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800
----------------------------------------------------------------------
.../test/java/org/apache/kylin/job/DeployUtil.java | 11 ++++-------
.../main/java/org/apache/kylin/cube/CubeSegment.java | 15 +++++++--------
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 5 ++++-
.../java/org/apache/kylin/engine/mr/IMRInput.java | 10 ++++++++++
.../main/java/org/apache/kylin/engine/mr/MRUtil.java | 4 ++++
.../localmeta/data/DEFAULT.STREAMING_TABLE.csv | 1 +
.../org/apache/kylin/source/hive/HiveMRInput.java | 10 ++++++++++
7 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 986edf6..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,12 +18,8 @@
package org.apache.kylin.job;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.io.IOUtils;
@@ -165,7 +161,7 @@ public class DeployUtil {
TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
StringBuilder sb = new StringBuilder();
for (String json : data) {
- List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
+ List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
@@ -183,6 +179,7 @@ public class DeployUtil {
in.close();
}
+
private static void deployHiveTables() throws Exception {
MetadataManager metaMgr = MetadataManager.getInstance(config());
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index abfb5ff..4697c63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -121,19 +121,18 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
* returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
*/
public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) {
- if (startOffset == 0 && endOffset == 0) {
- startOffset = startDate;
- endOffset = endDate;
- }
+ if (startOffset != 0 || endOffset != 0) {
+ if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
+ return "FULL_BUILD";
+ }
- if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
- return "FULL_BUILD";
+ return startOffset + "_" + endOffset;
}
+ // using time
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- return dateFormat.format(startOffset) + "_" + dateFormat.format(endOffset);
+ return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 33081c7..10483eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -36,10 +36,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+ private final IMRInput.IMRBatchMergeInputSide inputSide;
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+ this.inputSide = MRUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
@@ -57,6 +59,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
}
// Phase 1: Merge Dictionary
+ inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..6e01877 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -34,6 +34,9 @@ public interface IMRInput {
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
+ /** Return a helper to participate in batch cubing merge job flow. */
+ public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
+
/**
* Utility that configures mapper to read from a table.
*/
@@ -66,4 +69,11 @@ public interface IMRInput {
/** Add step that does necessary clean up, like delete the intermediate flat table */
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
+
+ public interface IMRBatchMergeInputSide {
+
+ /** Add step that executes before merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 1a86329..14fdd93 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -68,6 +68,10 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
+ public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+ }
+
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
index e69de29..8b13789 100644
--- a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
+++ b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
@@ -0,0 +1 @@
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0c969f2..9ec0f02 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -65,6 +65,16 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(table.getIdentity());
}
+ @Override
+ public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return new IMRBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;