You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/08/10 04:13:42 UTC

[1/6] kylin git commit: refactor some streaming classes

Repository: kylin
Updated Branches:
  refs/heads/master 2146f2b00 -> c67891d26


refactor some streaming classes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2b693c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2b693c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2b693c7

Branch: refs/heads/master
Commit: a2b693c7955b4e5c436f4f815cd5588da93f7e98
Parents: 2146f2b
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jun 23 10:34:52 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:09:22 2016 +0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/kylin/job/DeployUtil.java  |  2 +-
 .../org/apache/kylin/common/util/StreamingMessage.java  |  4 ++++
 .../apache/kylin/source/kafka/KafkaStreamingInput.java  |  3 ++-
 .../org/apache/kylin/source/kafka/StreamingParser.java  |  3 ++-
 .../kylin/source/kafka/StringStreamingParser.java       | 12 ++++--------
 .../kylin/source/kafka/TimedJsonStreamParser.java       | 12 ++++--------
 .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java |  3 ++-
 .../org/apache/kylin/source/kafka/util/KafkaUtils.java  |  3 ++-
 8 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index da97df3..6128770 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -166,7 +166,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index f327db2..53ab195 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -51,6 +51,10 @@ public class StreamingMessage {
         return offset;
     }
 
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
     public final long getTimestamp() {
         return timestamp;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 4a3c2a9..fe3fe0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -205,7 +205,8 @@ public class KafkaStreamingInput implements IStreamingInput {
                     for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
                         offset++;
                         consumeMsgCount++;
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             final long timestamp = streamingMessage.getTimestamp();
                             if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 89b9b56..cb6a72b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.common.util.TimeUtil;
@@ -66,7 +67,7 @@ public abstract class StreamingParser {
      * @param message
      * @return StreamingMessage must not be NULL
      */
-    abstract public StreamingMessage parse(Object message);
+    abstract public StreamingMessage parse(ByteBuffer message);
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cb826cb..8888d67 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -43,8 +43,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;
 
-import kafka.message.MessageAndOffset;
-
 /**
  */
 public final class StringStreamingParser extends StreamingParser {
@@ -55,12 +53,10 @@ public final class StringStreamingParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object message) {
-        MessageAndOffset kafkaMessage = (MessageAndOffset) message;
-        final ByteBuffer payload = kafkaMessage.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap());
+    public StreamingMessage parse(ByteBuffer message) {
+        byte[] bytes = new byte[message.limit()];
+        message.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 51c490e..d4308db 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -41,8 +41,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -54,8 +54,6 @@ import com.fasterxml.jackson.databind.type.MapType;
 import com.fasterxml.jackson.databind.type.SimpleType;
 import com.google.common.collect.Lists;
 
-import kafka.message.MessageAndOffset;
-
 /**
  * each json message with a "timestamp" field
  */
@@ -99,10 +97,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object msg) {
-        MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
+    public StreamingMessage parse(ByteBuffer buffer) {
         try {
-            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
             Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
@@ -123,8 +120,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
-
+            return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index 752ddd7..efaa042 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication {
                         offset++;
                         consumeMsgCount++;
 
-                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             streamQueue.add(streamingMessage);
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 0d8499d..24eaa05 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -162,7 +162,8 @@ public final class KafkaUtils {
         final ByteBuffer payload = messageAndOffset.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);
-        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+        streamingMessage.setOffset(messageAndOffset.offset());
         logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
         return streamingMessage.getTimestamp();
 


[2/6] kylin git commit: KYLIN-1726 Scalable streaming cubing

Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/acde3396
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/acde3396
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/acde3396

Branch: refs/heads/master
Commit: acde339623d43fa9b441614bc64ca7960e9255fe
Parents: a2b693c
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jul 3 21:43:16 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:10:10 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   | 11 ++++----
 .../kylin/job/streaming/KafkaDataLoader.java    | 11 ++++++--
 .../kylin/job/streaming/StreamDataLoader.java   | 22 +++++++++++++++
 .../java/org/apache/kylin/source/ISource.java   |  4 +++
 .../org/apache/kylin/source/SourceFactory.java  |  5 ++++
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  1 +
 .../kylin/engine/mr/JobBuilderSupport.java      |  2 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 28 ++++++++++++++++++++
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  1 +
 .../engine/mr/steps/FactDistinctColumnsJob.java |  3 +++
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  1 +
 .../test_streaming_table_cube_desc.json         |  3 +++
 .../kafka/DEFAULT.STREAMING_TABLE.json          |  2 +-
 .../kafka/default.streaming_table.json          |  2 +-
 .../streaming/DEFAULT.STREAMING_TABLE.json      |  2 +-
 .../streaming/default.streaming_table.json      |  2 +-
 .../table/DEFAULT.STREAMING_TABLE.json          |  3 ++-
 .../kylin/provision/BuildCubeWithStream.java    |  3 ++-
 .../apache/kylin/source/hive/HiveSource.java    |  8 ++++++
 19 files changed, 99 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 6128770..986edf6 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamDataLoader;
 import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -45,7 +45,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.hive.HiveClient;
 import org.apache.kylin.source.hive.HiveCmdBuilder;
 import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.maven.model.Model;
 import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.slf4j.Logger;
@@ -148,15 +147,15 @@ public class DeployUtil {
         deployHiveTables();
     }
 
-    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+    public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
         CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
         List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
         List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
         TableDesc tableDesc = cubeInstance.getFactTableDesc();
         //load into kafka
-        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
-        KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2);
-        logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
+        streamDataLoader.loadIntoKafka(data);
+        streamDataLoader.loadIntoKafka(data2);
+        logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
 
         //csv data for H2 use
         List<TblColRef> tableColumns = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 5242ff2..0eaae20 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -30,6 +30,7 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
@@ -38,9 +39,15 @@ import kafka.producer.ProducerConfig;
 /**
  * Load prepared data into kafka(for test use)
  */
-public class KafkaDataLoader {
+public class KafkaDataLoader extends StreamDataLoader {
+    List<KafkaClusterConfig> kafkaClusterConfigs;
 
-    public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
+    public KafkaDataLoader(KafkaConfig kafkaConfig) {
+        super(kafkaConfig);
+        this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
+    }
+
+    public void loadIntoKafka(List<String> messages) {
 
         KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
         String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
new file mode 100644
index 0000000..50fc883
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.List;
+
+/**
+ */
+public abstract class StreamDataLoader {
+    protected KafkaConfig kafkaConfig;
+    public StreamDataLoader(KafkaConfig kafkaConfig) {
+        this.kafkaConfig = kafkaConfig;
+    }
+
+    abstract public void loadIntoKafka(List<String> messages);
+
+    @Override
+    public String toString() {
+        return "kafka topic " + kafkaConfig.getTopic();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 3cd8a02..e9216f9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -20,9 +20,13 @@ package org.apache.kylin.source;
 
 import org.apache.kylin.metadata.model.TableDesc;
 
+import java.util.List;
+
 public interface ISource {
 
     public <I> I adaptToBuildEngine(Class<I> engineInterface);
 
     public ReadableTable createReadableTable(TableDesc tableDesc);
+
+    public List<String> getMRDependentResources(TableDesc table);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index f701a0f..e82c6ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.source;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
@@ -45,4 +46,8 @@ public class SourceFactory {
         return tableSource(table).adaptToBuildEngine(engineInterface);
     }
 
+    public static List<String> getMRDependentResources(TableDesc table) {
+        return tableSource(table).getMRDependentResources(table);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index ec9b1c6..5a098a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 3e9aff6..86451c9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -73,7 +73,7 @@ public class JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
         result.setMapReduceParams(cmd.toString());
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 02928e0..04ecc71 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
@@ -67,6 +68,7 @@ import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,6 +166,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
         String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
         String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+        String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency");
         logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
 
         Configuration jobConf = job.getConfiguration();
@@ -221,6 +224,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             }
         }
 
+        // for hive dependencies
+        if (kylinKafkaDependency != null) {
+            kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
+
+            logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency);
+
+            if (kylinDependency.length() > 0)
+                kylinDependency.append(",");
+            kylinDependency.append(kylinKafkaDependency);
+        } else {
+
+            logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+
+            try {
+                String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
+                kylinDependency.append(kafkaClientJarPath).append(",");
+                logger.info("kafka jar file: " + kafkaClientJarPath);
+
+            } catch (ClassNotFoundException e) {
+                logger.error("Cannot found kafka dependency jars: " + e);
+            }
+        }
+
         // for KylinJobMRLibDir
         String mrLibDir = kylinConf.getKylinJobMRLibDir();
         if (!StringUtils.isBlank(mrLibDir)) {
@@ -442,6 +468,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
             TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
+            List<String> dependentResources = SourceFactory.getMRDependentResources(table);
+            dumpList.addAll(dependentResources);
         }
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index f037d2e..f3524f8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob {
             }
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 39aae72..f091ab9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -54,6 +54,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_SEGMENT_NAME);
             options.addOption(OPTION_STATISTICS_ENABLED);
@@ -62,6 +63,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index f5076e4..510dbe8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -95,6 +95,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             }
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 0267db5..23e5b00 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -105,6 +105,9 @@
       "joint_dims" : [ ]
     }
   } ],
+  "override_kylin_properties": {
+    "kylin.cube.algorithm": "random"
+  },
   "notify_list" : [ ],
   "status_need_notify" : [ ],
   "auto_merge_time_ranges" : null,

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
@@ -1,7 +1,7 @@
 {
  
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "topic": "test_streaming_table_topic_xyz",
   "timeout": 60000,
   "bufferSize": 65536,

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -1,7 +1,7 @@
 {
  
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "topic": "test_streaming_table_topic_xyz",
   "timeout": 60000,
   "bufferSize": 65536,

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
@@ -1,6 +1,6 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "type": "kafka",
   "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -1,6 +1,6 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
-  "name": "default.streaming_table",
+  "name": "DEFAULT.STREAMING_TABLE",
   "type": "kafka",
   "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 82f6fdb..5bcfa35 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -41,5 +41,6 @@
   ],
   "database": "DEFAULT",
   "source_type": 1,
-  "last_modified": 0
+  "last_modified": 0,
+  "source_type" : 1
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 95f0f3d..9490560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -99,7 +100,7 @@ public class BuildCubeWithStream {
         streamingConfig.setTopic(UUID.randomUUID().toString());
         KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
 
-        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
+        DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
     }
 
     public void cleanup() throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index b7dbff0..e9cebea 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,11 +18,14 @@
 
 package org.apache.kylin.source.hive;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ReadableTable;
 
+import java.util.List;
+
 //used by reflection
 public class HiveSource implements ISource {
 
@@ -41,4 +44,9 @@ public class HiveSource implements ISource {
         return new HiveTable(tableDesc);
     }
 
+    @Override
+    public List<String> getMRDependentResources(TableDesc table) {
+        return Lists.newArrayList();
+    }
+
 }


[4/6] kylin git commit: KYLIN-1726 Scalable streaming cubing

Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23407e3d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23407e3d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23407e3d

Branch: refs/heads/master
Commit: 23407e3ddeff9011151d871f1f4e51c0a987564c
Parents: acde339
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jul 6 11:33:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800

----------------------------------------------------------------------
 .../test/java/org/apache/kylin/job/DeployUtil.java   | 11 ++++-------
 .../main/java/org/apache/kylin/cube/CubeSegment.java | 15 +++++++--------
 .../kylin/engine/mr/BatchMergeJobBuilder2.java       |  5 ++++-
 .../java/org/apache/kylin/engine/mr/IMRInput.java    | 10 ++++++++++
 .../main/java/org/apache/kylin/engine/mr/MRUtil.java |  4 ++++
 .../localmeta/data/DEFAULT.STREAMING_TABLE.csv       |  1 +
 .../org/apache/kylin/source/hive/HiveMRInput.java    | 10 ++++++++++
 7 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 986edf6..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,12 +18,8 @@
 
 package org.apache.kylin.job;
 
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
@@ -165,7 +161,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }
@@ -183,6 +179,7 @@ public class DeployUtil {
         in.close();
     }
 
+
     private static void deployHiveTables() throws Exception {
 
         MetadataManager metaMgr = MetadataManager.getInstance(config());

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index abfb5ff..4697c63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -121,19 +121,18 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
      * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
      */
     public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) {
-        if (startOffset == 0 && endOffset == 0) {
-            startOffset = startDate;
-            endOffset = endDate;
-        }
+        if (startOffset != 0 || endOffset != 0) {
+            if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
+                return "FULL_BUILD";
+            }
 
-        if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
-            return "FULL_BUILD";
+            return startOffset + "_" + endOffset;
         }
 
+        // using time
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
         dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
-
-        return dateFormat.format(startOffset) + "_" + dateFormat.format(endOffset);
+        return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 33081c7..10483eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -36,10 +36,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
     private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
 
     private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+    private final IMRInput.IMRBatchMergeInputSide inputSide;
 
     public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
         super(mergeSegment, submitter);
-        this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
+        this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+        this.inputSide = MRUtil.getBatchMergeInputSide(seg);
     }
 
     public CubingJob build() {
@@ -57,6 +59,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         }
 
         // Phase 1: Merge Dictionary
+        inputSide.addStepPhase1_MergeDictionary(result);
         result.addTask(createMergeDictionaryStep(mergingSegmentIds));
         result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
         outputSide.addStepPhase1_MergeDictionary(result);

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..6e01877 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -34,6 +34,9 @@ public interface IMRInput {
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table);
 
+    /** Return a helper to participate in batch cubing merge job flow. */
+    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
+
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -66,4 +69,11 @@ public interface IMRInput {
         /** Add step that does necessary clean up, like delete the intermediate flat table */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
+
+    public interface IMRBatchMergeInputSide {
+
+        /** Add step that executes before merge dictionary and before merge cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 1a86329..14fdd93 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -68,6 +68,10 @@ public class MRUtil {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
+    public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+    }
+
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
     public static int runMRJob(Tool tool, String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
index e69de29..8b13789 100644
--- a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
+++ b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
@@ -0,0 +1 @@
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0c969f2..9ec0f02 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -65,6 +65,16 @@ public class HiveMRInput implements IMRInput {
         return new HiveTableInputFormat(table.getIdentity());
     }
 
+    @Override
+    public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;


[3/6] kylin git commit: KYLIN-1726 add partition column in fact distinct output

Posted by sh...@apache.org.
KYLIN-1726 add partition column in fact distinct output

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/76d8672c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/76d8672c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/76d8672c

Branch: refs/heads/master
Commit: 76d8672ca77ee662bece45e1b2428d42d9beccc6
Parents: 23407e3
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 8 10:16:53 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/cube/CubeManager.java     | 8 ++++++++
 .../localmeta/table/DEFAULT.STREAMING_TABLE.json             | 1 -
 2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/76d8672c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 0941d56..a456537 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -925,6 +925,14 @@ public class CubeManager implements IRealizationProvider {
                 factDictCols.add(col);
             }
         }
+
+        // add partition column in all case
+        if (cubeDesc.getModel().getPartitionDesc() != null) {
+            TblColRef partitionCol = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            if (factDictCols.contains(partitionCol) == false) {
+                factDictCols.add(partitionCol);
+            }
+        }
         return factDictCols;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/76d8672c/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 5bcfa35..64f359b 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -1,5 +1,4 @@
 {
- 
   "uuid": "e286e39e-40d7-44c2-8fa2-41b365123987",
   "name": "STREAMING_TABLE",
   "columns": [


[6/6] kylin git commit: KYLIN-1859 reorg imports

Posted by sh...@apache.org.
KYLIN-1859 reorg imports


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67891d2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67891d2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67891d2

Branch: refs/heads/master
Commit: c67891d26a5a937cc4850fc389ad395d1609b575
Parents: a35dc3c
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 10 12:13:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 12:13:05 2016 +0800

----------------------------------------------------------------------
 core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java     | 2 +-
 .../org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java     | 1 -
 .../src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java  | 1 -
 .../org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java   | 1 -
 .../kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java       | 1 -
 .../main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java | 1 -
 .../java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java   | 1 -
 .../java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java   | 1 -
 .../main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java | 1 -
 .../java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java | 1 -
 10 files changed, 1 insertion(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4533ae6..2ebf5d3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -590,7 +590,7 @@ public class CubeManager implements IRealizationProvider {
         if (existing.isEmpty()) {
             return cube.getDescriptor().getPartitionDateStart();
         } else {
-            return existing.get(existing.size() - 1).getDateRangeStart();
+            return existing.get(existing.size() - 1).getDateRangeEnd();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index cc2bf7d..85e3cc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -46,7 +46,6 @@ import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 90dec84..0399300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -44,7 +44,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index a4b087b..a6c4d30 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -37,7 +37,6 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 5680004..a91d333 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index d50c8a5..f6ed8e7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -41,7 +41,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 75b6489..24c37ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -45,7 +45,6 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 906ccdc..a6bef83 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -51,7 +51,6 @@ import org.apache.kylin.measure.BufferedMeasureEncoder;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index fbd02ac..b566c2e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -38,7 +38,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 5c45673..f83ad7d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -50,7 +50,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;


[5/6] kylin git commit: KYLIN-1859 Use segment "uuid" instead of "name" to seek a segment across the system

Posted by sh...@apache.org.
KYLIN-1859 Use segment "uuid" instead of "name" to seek a segment across the system

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a35dc3cb
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a35dc3cb
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a35dc3cb

Branch: refs/heads/master
Commit: a35dc3cb6b4e5f6a89001c146648200a15456443
Parents: 76d8672
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 8 13:29:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:31:42 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  2 +-
 .../java/org/apache/kylin/cube/CubeManager.java | 23 ++++++++++++++++----
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |  5 ++---
 .../cube/model/CubeJoinedFlatTableDesc.java     |  2 +-
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  4 ++--
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  6 ++---
 .../kylin/engine/mr/BatchMergeJobBuilder.java   |  2 +-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 15 +++++++++++++
 .../kylin/engine/mr/JobBuilderSupport.java      |  4 ++--
 .../engine/mr/common/AbstractHadoopJob.java     |  2 ++
 .../kylin/engine/mr/common/BatchConstants.java  |  4 +++-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  6 ++---
 .../engine/mr/steps/CreateDictionaryJob.java    |  6 ++---
 .../apache/kylin/engine/mr/steps/CuboidJob.java | 10 ++++-----
 .../engine/mr/steps/FactDistinctColumnsJob.java | 14 ++++++------
 .../mr/steps/FactDistinctColumnsMapperBase.java |  2 +-
 .../kylin/engine/mr/steps/InMemCuboidJob.java   |  8 +++----
 .../engine/mr/steps/InMemCuboidMapper.java      |  4 ++--
 .../kylin/engine/mr/steps/MergeCuboidJob.java   |  6 ++---
 .../engine/mr/steps/MergeCuboidMapper.java      |  6 ++---
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  6 ++---
 .../engine/mr/steps/NDCuboidMapperTest.java     |  4 ++--
 .../test_kylin_cube_with_slr_1_new_segment.json |  1 +
 .../test_streaming_table_cube_desc.json         |  2 +-
 .../storage/hbase/steps/CreateHTableJob.java    |  8 +++----
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  2 +-
 26 files changed, 94 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 0eaae20..454f6cf 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -69,7 +69,7 @@ public class KafkaDataLoader extends StreamDataLoader {
 
         List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
         for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
+            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
             keyedMessages.add(keyedMessage);
         }
         producer.send(keyedMessages);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a456537..4533ae6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -421,10 +421,14 @@ public class CubeManager implements IRealizationProvider {
             // try figure out a reasonable start if missing
             if (startDate == 0 && startOffset == 0) {
                 boolean isOffsetsOn = endOffset != 0;
-                if (isOffsetsOn)
-                    startOffset = calculateStartDateForAppendSegment(cube);
-                else
+                if (isOffsetsOn) {
+                    startOffset = calculateStartOffsetForAppendSegment(cube);
+                    if (startOffset == Long.MAX_VALUE) {
+                        throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+                    }
+                } else {
                     startDate = calculateStartDateForAppendSegment(cube);
+                }
             }
         } else {
             startDate = 0;
@@ -570,12 +574,23 @@ public class CubeManager implements IRealizationProvider {
         return max;
     }
 
+
+    private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
+        List<CubeSegment> existing = cube.getSegments();
+        if (existing.isEmpty()) {
+            return 0;
+        } else {
+            return existing.get(existing.size() - 1).getSourceOffsetEnd();
+        }
+    }
+
+
     private long calculateStartDateForAppendSegment(CubeInstance cube) {
         List<CubeSegment> existing = cube.getSegments();
         if (existing.isEmpty()) {
             return cube.getDescriptor().getPartitionDateStart();
         } else {
-            return existing.get(existing.size() - 1).getSourceOffsetEnd();
+            return existing.get(existing.size() - 1).getDateRangeStart();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 3b65d1f..d3b0782 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,9 +34,9 @@ public class DictionaryGeneratorCLI {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
 
-    public static void processSegment(KylinConfig config, String cubeName, String segmentName, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
+    public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        CubeSegment segment = cube.getSegmentById(segmentID);
 
         processSegment(config, segment, factTableValueProvider);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index b24ef4d..04aea89 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -78,7 +78,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         if (cubeSegment == null) {
             this.tableName = "kylin_intermediate_" + cubeDesc.getName();
         } else {
-            this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
+            this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getUuid();
         }
 
         int columnIndex = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 5a098a8..6c973eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -94,7 +94,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
@@ -116,7 +116,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 93ae1e4..6eba3c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -110,7 +110,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
 
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
@@ -135,7 +135,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
 
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
@@ -161,7 +161,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 0769b52..33b6f29 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -86,7 +86,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
 
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 10483eb..71a20a1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -88,6 +88,21 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
     protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() {
         return MergeCuboidJob.class;
+//    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
+//        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+//        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+//        StringBuilder cmd = new StringBuilder();
+//
+//        appendMapReduceParameters(cmd);
+//        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+//        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+//        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+//        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+//        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+//
+//        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+//        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
+//        return mergeCuboidDataStep;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 86451c9..159e5cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -68,7 +68,7 @@ public class JobBuilderSupport {
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId));
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
@@ -84,7 +84,7 @@ public class JobBuilderSupport {
         buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
 
         buildDictionaryStep.setJobParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 04ecc71..af2ed9f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -79,7 +79,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
     protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
     protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
+//    @Deprecated
     protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME);
+    protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID);
     protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
     protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
     protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 387e695..f0503a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -30,7 +30,8 @@ public interface BatchConstants {
      */
 
     String CFG_CUBE_NAME = "cube.name";
-    String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+//    String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+    String CFG_CUBE_SEGMENT_ID = "cube.segment.id";
     String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
 
     String CFG_II_NAME = "ii.name";
@@ -65,6 +66,7 @@ public interface BatchConstants {
     String ARG_CUBE_NAME = "cubename";
     String ARG_II_NAME = "iiname";
     String ARG_SEGMENT_NAME = "segmentname";
+    String ARG_SEGMENT_ID = "segmentid";
     String ARG_PARTITION = "partitions";
     String ARG_STATS_ENABLED = "statisticsenabled";
     String ARG_STATS_OUTPUT = "statisticsoutput";

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 10fbba3..cc2bf7d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -60,7 +60,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
     public static final byte[] ONE = Bytes.toBytes("1");
     protected String cubeName;
-    protected String segmentName;
+    protected String segmentID;
     protected Cuboid baseCuboid;
     protected CubeInstance cube;
     protected CubeDesc cubeDesc;
@@ -86,7 +86,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         super.bindCurrentConfiguration(context.getConfiguration());
 
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
         intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
         if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
             throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
@@ -98,7 +98,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
 
         cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeDesc = cube.getDescriptor();
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        cubeSegment = cube.getSegmentById(segmentID);
 
         long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 59233b9..69c0095 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -43,17 +43,17 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
         try {
             options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_INPUT_PATH);
             parseOptions(options, args);
 
             final String cubeName = getOptionValue(OPTION_CUBE_NAME);
-            final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
             final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
 
             KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-            DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
+            DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() {
                 @Override
                 public ReadableTable getDistinctValuesFor(TblColRef col) {
                     return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index f3524f8..90dec84 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -86,7 +86,7 @@ public class CuboidJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_NCUBOID_LEVEL);
@@ -97,14 +97,14 @@ public class CuboidJob extends AbstractHadoopJob {
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
             String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
 
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
             if (checkSkip(cubingJobId)) {
-                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeName + "[" + segmentName + "]");
+                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
                 return 0;
             }
 
@@ -115,7 +115,7 @@ public class CuboidJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             // Mapper
-            configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+            configureMapperInputFormat(cube.getSegmentById(segmentID));
             job.setMapperClass(this.mapperClass);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(Text.class);
@@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f091ab9..a4b087b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -56,7 +56,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_STATISTICS_ENABLED);
             options.addOption(OPTION_STATISTICS_OUTPUT);
             options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
@@ -68,7 +68,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
 
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
             String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
             String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
             String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
@@ -80,7 +80,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
@@ -88,10 +88,10 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
             setJobClasspath(job, cube.getConfig());
 
-            CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+            CubeSegment segment = cube.getSegmentById(segmentID);
             if (segment == null) {
-                logger.error("Failed to find {} in cube {}", segmentName, cube);
-                System.out.println("Failed to find {} in cube {} " + segmentName + "," + cube);
+                logger.error("Failed to find {} in cube {}", segmentID, cube);
+                System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube);
                 for (CubeSegment s : cube.getSegments()) {
                     logger.error(s.getName() + " with status " + s.getStatus());
                     System.out.println(s.getName() + " with status " + s.getStatus());
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 logger.info("Found segment: " + segment);
                 System.out.println("Found segment " + segment);
             }
-            setupMapper(segment);
+            setupMapper(cube.getSegmentById(segmentID));
             setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size());
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 35481fd..5680004 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -66,7 +66,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
+        cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 510dbe8..d50c8a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -75,18 +75,18 @@ public class InMemCuboidJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_CUBING_JOB_ID);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
             String output = getOptionValue(OPTION_OUTPUT_PATH);
 
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+            CubeSegment cubeSeg = cube.getSegmentById(segmentID);
             String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
 
             if (checkSkip(cubingJobId)) {
@@ -105,7 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
             // set input
             IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 7baf5c5..75b6489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -77,8 +77,8 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeDesc = cube.getDescriptor();
-        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+        cubeSegment = cube.getSegmentById(segmentID);
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
 
         Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 5546bce..e0ae74d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -39,13 +39,13 @@ public class MergeCuboidJob extends CuboidJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
 
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
@@ -76,7 +76,7 @@ public class MergeCuboidJob extends CuboidJob {
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5fd321c..906ccdc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -65,7 +65,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     private KylinConfig config;
     private String cubeName;
-    private String segmentName;
+    private String segmentID;
     private CubeManager cubeManager;
     private CubeInstance cube;
     private CubeDesc cubeDesc;
@@ -95,14 +95,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         super.bindCurrentConfiguration(context.getConfiguration());
 
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
 
         config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeManager = CubeManager.getInstance(config);
         cube = cubeManager.getCube(cubeName);
         cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        mergedCubeSegment = cube.getSegmentById(segmentID);
 
         // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index d822134..fbd02ac 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -52,7 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
     private Text outputKey = new Text();
     private String cubeName;
-    private String segmentName;
+    private String segmentID;
     private CubeSegment cubeSegment;
     private CubeDesc cubeDesc;
     private CuboidScheduler cuboidScheduler;
@@ -70,12 +70,12 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         super.bindCurrentConfiguration(context.getConfiguration());
 
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+        segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
 
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        cubeSegment = cube.getSegmentById(segmentID);
         cubeDesc = cube.getDescriptor();
 
         // initialize CubiodScheduler

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
index caf87e2..daab3b1 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -70,9 +70,9 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
     public void testMapReduceWithSlr() throws IOException {
 
         String cubeName = "test_kylin_cube_with_slr_1_new_segment";
-        String segmentName = "20130331080000_20131212080000";
+        String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
         mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+        mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
         byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
         byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
index 7bb8078..8c48ffd 100644
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
+++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
@@ -7,6 +7,7 @@
   "descriptor" : "test_kylin_cube_with_slr_desc",
   "cost" : 50,
   "segments" : [ {
+    "uuid" : "198va32a-a33e-4b69-83dd-0bb8b1f8c53b",
     "name" : "20130331080000_20131212080000",
     "storage_location_identifier" : "KYLIN-CUBE-TEST_KYLIN_CUBE_WITH_SLR_READY-F24668F6-DCFF-4CB6-A89B-77F1119DF8FA",
     "date_range_start" : 1364688000000,

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 23e5b00..ef10c1e 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -106,7 +106,7 @@
     }
   } ],
   "override_kylin_properties": {
-    "kylin.cube.algorithm": "random"
+    "kylin.cube.algorithm": "inmem"
   },
   "notify_list" : [ ],
   "status_need_notify" : [ ],

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b93e0a1..5c45673 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -66,7 +66,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
     CubeInstance cube = null;
     CubeDesc cubeDesc = null;
-    String segmentName = null;
+    String segmentID = null;
     KylinConfig kylinConfig;
     Path partitionFilePath;
 
@@ -75,7 +75,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         Options options = new Options();
 
         options.addOption(OPTION_CUBE_NAME);
-        options.addOption(OPTION_SEGMENT_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_STATISTICS_ENABLED);
         parseOptions(options, args);
@@ -88,8 +88,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
         cube = cubeMgr.getCube(cubeName);
         cubeDesc = cube.getDescriptor();
         kylinConfig = cube.getConfig();
-        segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        segmentID = getOptionValue(OPTION_SEGMENT_ID);
+        CubeSegment cubeSegment = cube.getSegmentById(segmentID);
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 0914827..0679feb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -87,7 +87,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
         appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));