You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/08/10 04:13:45 UTC

[4/6] kylin git commit: KYLIN-1726 Scalable streaming cubing

KYLIN-1726 Scalable streaming cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23407e3d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23407e3d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23407e3d

Branch: refs/heads/master
Commit: 23407e3ddeff9011151d871f1f4e51c0a987564c
Parents: acde339
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jul 6 11:33:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800

----------------------------------------------------------------------
 .../test/java/org/apache/kylin/job/DeployUtil.java   | 11 ++++-------
 .../main/java/org/apache/kylin/cube/CubeSegment.java | 15 +++++++--------
 .../kylin/engine/mr/BatchMergeJobBuilder2.java       |  5 ++++-
 .../java/org/apache/kylin/engine/mr/IMRInput.java    | 10 ++++++++++
 .../main/java/org/apache/kylin/engine/mr/MRUtil.java |  4 ++++
 .../localmeta/data/DEFAULT.STREAMING_TABLE.csv       |  1 +
 .../org/apache/kylin/source/hive/HiveMRInput.java    | 10 ++++++++++
 7 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 986edf6..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,12 +18,8 @@
 
 package org.apache.kylin.job;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
@@ -165,7 +161,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }
@@ -183,6 +179,7 @@ public class DeployUtil {
         in.close();
     }
 
+
     private static void deployHiveTables() throws Exception {
 
         MetadataManager metaMgr = MetadataManager.getInstance(config());

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index abfb5ff..4697c63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -121,19 +121,18 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
      * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
      */
     public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) {
-        if (startOffset == 0 && endOffset == 0) {
-            startOffset = startDate;
-            endOffset = endDate;
-        }
+        if (startOffset != 0 || endOffset != 0) {
+            if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
+                return "FULL_BUILD";
+            }
 
-        if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
-            return "FULL_BUILD";
+            return startOffset + "_" + endOffset;
         }
 
+        // using time
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
         dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        return dateFormat.format(startOffset) + "_" + dateFormat.format(endOffset);
+        return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 33081c7..10483eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -36,10 +36,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
     private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
 
     private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+    private final IMRInput.IMRBatchMergeInputSide inputSide;
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+        this.inputSide = MRUtil.getBatchMergeInputSide(seg);
     }
 
     public CubingJob build() {
@@ -57,6 +59,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         }
 
         // Phase 1: Merge Dictionary
+        inputSide.addStepPhase1_MergeDictionary(result);
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..6e01877 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -34,6 +34,9 @@ public interface IMRInput {
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);
 
+    /** Return a helper to participate in batch cubing merge job flow. */
+    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
+
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -66,4 +69,11 @@ public interface IMRInput {
         /** Add step that does necessary clean up, like delete the intermediate flat table */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
+
+    public interface IMRBatchMergeInputSide {
+
+        /** Add step that executes before merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 1a86329..14fdd93 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -68,6 +68,10 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
+    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+    }
+
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
     public static int runMRJob(Tool tool, String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
index e69de29..8b13789 100644
--- a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
+++ b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
@@ -0,0 +1 @@
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0c969f2..9ec0f02 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -65,6 +65,16 @@ public class HiveMRInput implements IMRInput {
         return new HiveTableInputFormat(table.getIdentity());
     }
 
+    @Override
+    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;