You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/08/10 04:13:42 UTC
[1/6] kylin git commit: refactor some streaming classes
Repository: kylin
Updated Branches:
refs/heads/master 2146f2b00 -> c67891d26
refactor some streaming classes
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2b693c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2b693c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2b693c7
Branch: refs/heads/master
Commit: a2b693c7955b4e5c436f4f815cd5588da93f7e98
Parents: 2146f2b
Author: shaofengshi <sh...@apache.org>
Authored: Thu Jun 23 10:34:52 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:09:22 2016 +0800
----------------------------------------------------------------------
.../src/test/java/org/apache/kylin/job/DeployUtil.java | 2 +-
.../org/apache/kylin/common/util/StreamingMessage.java | 4 ++++
.../apache/kylin/source/kafka/KafkaStreamingInput.java | 3 ++-
.../org/apache/kylin/source/kafka/StreamingParser.java | 3 ++-
.../kylin/source/kafka/StringStreamingParser.java | 12 ++++--------
.../kylin/source/kafka/TimedJsonStreamParser.java | 12 ++++--------
.../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java | 3 ++-
.../org/apache/kylin/source/kafka/util/KafkaUtils.java | 3 ++-
8 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index da97df3..6128770 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -166,7 +166,7 @@ public class DeployUtil {
TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
StringBuilder sb = new StringBuilder();
for (String json : data) {
- List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData();
+ List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index f327db2..53ab195 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -51,6 +51,10 @@ public class StreamingMessage {
return offset;
}
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
public final long getTimestamp() {
return timestamp;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 4a3c2a9..fe3fe0a 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -205,7 +205,8 @@ public class KafkaStreamingInput implements IStreamingInput {
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
offset++;
consumeMsgCount++;
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+ final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+ streamingMessage.setOffset(messageAndOffset.offset());
if (streamingParser.filter(streamingMessage)) {
final long timestamp = streamingMessage.getTimestamp();
if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 89b9b56..cb6a72b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.common.util.TimeUtil;
@@ -66,7 +67,7 @@ public abstract class StreamingParser {
* @param message
* @return StreamingMessage must not be NULL
*/
- abstract public StreamingMessage parse(Object message);
+ abstract public StreamingMessage parse(ByteBuffer message);
abstract public boolean filter(StreamingMessage streamingMessage);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cb826cb..8888d67 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -43,8 +43,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Lists;
-import kafka.message.MessageAndOffset;
-
/**
*/
public final class StringStreamingParser extends StreamingParser {
@@ -55,12 +53,10 @@ public final class StringStreamingParser extends StreamingParser {
}
@Override
- public StreamingMessage parse(Object message) {
- MessageAndOffset kafkaMessage = (MessageAndOffset) message;
- final ByteBuffer payload = kafkaMessage.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap());
+ public StreamingMessage parse(ByteBuffer message) {
+ byte[] bytes = new byte[message.limit()];
+ message.get(bytes);
+ return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 51c490e..d4308db 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -41,8 +41,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-
import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -54,8 +54,6 @@ import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.SimpleType;
import com.google.common.collect.Lists;
-import kafka.message.MessageAndOffset;
-
/**
* each json message with a "timestamp" field
*/
@@ -99,10 +97,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
@Override
- public StreamingMessage parse(Object msg) {
- MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
+ public StreamingMessage parse(ByteBuffer buffer) {
try {
- Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+ Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType);
Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
String tsStr = root.get(tsColName);
@@ -123,8 +120,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
}
- return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
-
+ return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap());
} catch (IOException e) {
logger.error("error", e);
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index 752ddd7..efaa042 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication {
offset++;
consumeMsgCount++;
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+ final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+ streamingMessage.setOffset(messageAndOffset.offset());
if (streamingParser.filter(streamingMessage)) {
streamQueue.add(streamingMessage);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 0d8499d..24eaa05 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -162,7 +162,8 @@ public final class KafkaUtils {
final ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
- final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+ final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload());
+ streamingMessage.setOffset(messageAndOffset.offset());
logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp()));
return streamingMessage.getTimestamp();
[2/6] kylin git commit: KYLIN-1726 Scalable streaming cubing
Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/acde3396
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/acde3396
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/acde3396
Branch: refs/heads/master
Commit: acde339623d43fa9b441614bc64ca7960e9255fe
Parents: a2b693c
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jul 3 21:43:16 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:10:10 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/job/DeployUtil.java | 11 ++++----
.../kylin/job/streaming/KafkaDataLoader.java | 11 ++++++--
.../kylin/job/streaming/StreamDataLoader.java | 22 +++++++++++++++
.../java/org/apache/kylin/source/ISource.java | 4 +++
.../org/apache/kylin/source/SourceFactory.java | 5 ++++
.../kylin/engine/mr/BatchCubingJobBuilder.java | 1 +
.../kylin/engine/mr/JobBuilderSupport.java | 2 +-
.../engine/mr/common/AbstractHadoopJob.java | 28 ++++++++++++++++++++
.../apache/kylin/engine/mr/steps/CuboidJob.java | 1 +
.../engine/mr/steps/FactDistinctColumnsJob.java | 3 +++
.../kylin/engine/mr/steps/InMemCuboidJob.java | 1 +
.../test_streaming_table_cube_desc.json | 3 +++
.../kafka/DEFAULT.STREAMING_TABLE.json | 2 +-
.../kafka/default.streaming_table.json | 2 +-
.../streaming/DEFAULT.STREAMING_TABLE.json | 2 +-
.../streaming/default.streaming_table.json | 2 +-
.../table/DEFAULT.STREAMING_TABLE.json | 3 ++-
.../kylin/provision/BuildCubeWithStream.java | 3 ++-
.../apache/kylin/source/hive/HiveSource.java | 8 ++++++
19 files changed, 99 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 6128770..986edf6 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -36,7 +36,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.job.dataGen.FactTableGenerator;
-import org.apache.kylin.job.streaming.KafkaDataLoader;
+import org.apache.kylin.job.streaming.StreamDataLoader;
import org.apache.kylin.job.streaming.StreamingTableDataGenerator;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
@@ -45,7 +45,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.HiveClient;
import org.apache.kylin.source.hive.HiveCmdBuilder;
import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.slf4j.Logger;
@@ -148,15 +147,15 @@ public class DeployUtil {
deployHiveTables();
}
- public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, KafkaConfig kafkaConfig) throws IOException {
+ public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable());
List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable());
TableDesc tableDesc = cubeInstance.getFactTableDesc();
//load into kafka
- KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data);
- KafkaDataLoader.loadIntoKafka(kafkaConfig.getKafkaClusterConfigs(), data2);
- logger.info("Write {} messages into topic {}", data.size(), kafkaConfig.getTopic());
+ streamDataLoader.loadIntoKafka(data);
+ streamDataLoader.loadIntoKafka(data2);
+ logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
//csv data for H2 use
List<TblColRef> tableColumns = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 5242ff2..0eaae20 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -30,6 +30,7 @@ import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
@@ -38,9 +39,15 @@ import kafka.producer.ProducerConfig;
/**
* Load prepared data into kafka(for test use)
*/
-public class KafkaDataLoader {
+public class KafkaDataLoader extends StreamDataLoader {
+ List<KafkaClusterConfig> kafkaClusterConfigs;
- public static void loadIntoKafka(List<KafkaClusterConfig> kafkaClusterConfigs, List<String> messages) {
+ public KafkaDataLoader(KafkaConfig kafkaConfig) {
+ super(kafkaConfig);
+ this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
+ }
+
+ public void loadIntoKafka(List<String> messages) {
KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
new file mode 100644
index 0000000..50fc883
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamDataLoader.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.job.streaming;
+
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import java.util.List;
+
+/**
+ */
+public abstract class StreamDataLoader {
+ protected KafkaConfig kafkaConfig;
+ public StreamDataLoader(KafkaConfig kafkaConfig) {
+ this.kafkaConfig = kafkaConfig;
+ }
+
+ abstract public void loadIntoKafka(List<String> messages);
+
+ @Override
+ public String toString() {
+ return "kafka topic " + kafkaConfig.getTopic();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 3cd8a02..e9216f9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -20,9 +20,13 @@ package org.apache.kylin.source;
import org.apache.kylin.metadata.model.TableDesc;
+import java.util.List;
+
public interface ISource {
public <I> I adaptToBuildEngine(Class<I> engineInterface);
public ReadableTable createReadableTable(TableDesc tableDesc);
+
+ public List<String> getMRDependentResources(TableDesc table);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
index f701a0f..e82c6ed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java
@@ -18,6 +18,7 @@
package org.apache.kylin.source;
+import java.util.List;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
@@ -45,4 +46,8 @@ public class SourceFactory {
return tableSource(table).adaptToBuildEngine(engineInterface);
}
+ public static List<String> getMRDependentResources(TableDesc table) {
+ return tableSource(table).getMRDependentResources(table);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index ec9b1c6..5a098a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -99,6 +99,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 3e9aff6..86451c9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -73,7 +73,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
-
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
result.setMapReduceParams(cmd.toString());
return result;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 02928e0..04ecc71 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
@@ -67,6 +68,7 @@ import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,6 +166,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
+ String kylinKafkaDependency = System.getProperty("kylin.kafka.dependency");
logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
Configuration jobConf = job.getConfiguration();
@@ -221,6 +224,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
}
+ // for hive dependencies
+ if (kylinKafkaDependency != null) {
+ kylinKafkaDependency = kylinKafkaDependency.replace(":", ",");
+
+ logger.info("Kafka Dependencies Before Filtered: " + kylinHiveDependency);
+
+ if (kylinDependency.length() > 0)
+ kylinDependency.append(",");
+ kylinDependency.append(kylinKafkaDependency);
+ } else {
+
+ logger.info("No Kafka dependency jars set in the environment, will find them from jvm:");
+
+ try {
+ String kafkaClientJarPath = ClassUtil.findContainingJar(Class.forName("org.apache.kafka.clients.consumer.KafkaConsumer"));
+ kylinDependency.append(kafkaClientJarPath).append(",");
+ logger.info("kafka jar file: " + kafkaClientJarPath);
+
+ } catch (ClassNotFoundException e) {
+ logger.error("Cannot found kafka dependency jars: " + e);
+ }
+ }
+
// for KylinJobMRLibDir
String mrLibDir = kylinConf.getKylinJobMRLibDir();
if (!StringUtils.isBlank(mrLibDir)) {
@@ -442,6 +468,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
TableDesc table = metaMgr.getTableDesc(tableName);
dumpList.add(table.getResourcePath());
+ List<String> dependentResources = SourceFactory.getMRDependentResources(table);
+ dumpList.addAll(dependentResources);
}
for (CubeSegment segment : cube.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index f037d2e..f3524f8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -109,6 +109,7 @@ public class CuboidJob extends AbstractHadoopJob {
}
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 39aae72..f091ab9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -54,6 +54,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
@@ -62,6 +63,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index f5076e4..510dbe8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -95,6 +95,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
}
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
logger.info("Starting: " + job.getJobName());
setJobClasspath(job, cube.getConfig());
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 0267db5..23e5b00 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -105,6 +105,9 @@
"joint_dims" : [ ]
}
} ],
+ "override_kylin_properties": {
+ "kylin.cube.algorithm": "random"
+ },
"notify_list" : [ ],
"status_need_notify" : [ ],
"auto_merge_time_ranges" : null,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/kafka/DEFAULT.STREAMING_TABLE.json
@@ -1,7 +1,7 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"topic": "test_streaming_table_topic_xyz",
"timeout": 60000,
"bufferSize": 65536,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
index c97927d..6a64cce 100644
--- a/examples/test_case_data/localmeta/kafka/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -1,7 +1,7 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"topic": "test_streaming_table_topic_xyz",
"timeout": 60000,
"bufferSize": 65536,
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/streaming/DEFAULT.STREAMING_TABLE.json
@@ -1,6 +1,6 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"type": "kafka",
"last_modified": 0
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
index 6eb4a88..85a477b 100644
--- a/examples/test_case_data/localmeta/streaming/default.streaming_table.json
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -1,6 +1,6 @@
{
"uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "default.streaming_table",
+ "name": "DEFAULT.STREAMING_TABLE",
"type": "kafka",
"last_modified": 0
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 82f6fdb..5bcfa35 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -41,5 +41,6 @@
],
"database": "DEFAULT",
"source_type": 1,
- "last_modified": 0
+ "last_modified": 0,
+ "source_type" : 1
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 95f0f3d..9490560 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -33,6 +33,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.job.streaming.KafkaDataLoader;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -99,7 +100,7 @@ public class BuildCubeWithStream {
streamingConfig.setTopic(UUID.randomUUID().toString());
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig));
}
public void cleanup() throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/acde3396/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index b7dbff0..e9cebea 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -18,11 +18,14 @@
package org.apache.kylin.source.hive;
+import com.google.common.collect.Lists;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ReadableTable;
+import java.util.List;
+
//used by reflection
public class HiveSource implements ISource {
@@ -41,4 +44,9 @@ public class HiveSource implements ISource {
return new HiveTable(tableDesc);
}
+ @Override
+ public List<String> getMRDependentResources(TableDesc table) {
+ return Lists.newArrayList();
+ }
+
}
[4/6] kylin git commit: KYLIN-1726 Scalable streaming cubing
Posted by sh...@apache.org.
KYLIN-1726 Scalable streaming cubing
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/23407e3d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/23407e3d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/23407e3d
Branch: refs/heads/master
Commit: 23407e3ddeff9011151d871f1f4e51c0a987564c
Parents: acde339
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jul 6 11:33:30 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800
----------------------------------------------------------------------
.../test/java/org/apache/kylin/job/DeployUtil.java | 11 ++++-------
.../main/java/org/apache/kylin/cube/CubeSegment.java | 15 +++++++--------
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 5 ++++-
.../java/org/apache/kylin/engine/mr/IMRInput.java | 10 ++++++++++
.../main/java/org/apache/kylin/engine/mr/MRUtil.java | 4 ++++
.../localmeta/data/DEFAULT.STREAMING_TABLE.csv | 1 +
.../org/apache/kylin/source/hive/HiveMRInput.java | 10 ++++++++++
7 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 986edf6..8c64f91 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -18,12 +18,8 @@
package org.apache.kylin.job;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.io.IOUtils;
@@ -165,7 +161,7 @@ public class DeployUtil {
TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true");
StringBuilder sb = new StringBuilder();
for (String json : data) {
- List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData();
+ List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
@@ -183,6 +179,7 @@ public class DeployUtil {
in.close();
}
+
private static void deployHiveTables() throws Exception {
MetadataManager metaMgr = MetadataManager.getInstance(config());
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index abfb5ff..4697c63 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -121,19 +121,18 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable {
* returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
*/
public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) {
- if (startOffset == 0 && endOffset == 0) {
- startOffset = startDate;
- endOffset = endDate;
- }
+ if (startOffset != 0 || endOffset != 0) {
+ if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
+ return "FULL_BUILD";
+ }
- if (startOffset == 0 && (endOffset == 0 || endOffset == Long.MAX_VALUE)) {
- return "FULL_BUILD";
+ return startOffset + "_" + endOffset;
}
+ // using time
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- return dateFormat.format(startOffset) + "_" + dateFormat.format(endOffset);
+ return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
}
// ============================================================================
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 33081c7..10483eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -36,10 +36,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class);
private final IMROutput2.IMRBatchMergeOutputSide2 outputSide;
+ private final IMRInput.IMRBatchMergeInputSide inputSide;
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2(seg);
+ this.inputSide = MRUtil.getBatchMergeInputSide(seg);
}
public CubingJob build() {
@@ -57,6 +59,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
}
// Phase 1: Merge Dictionary
+ inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 336a66f..6e01877 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -34,6 +34,9 @@ public interface IMRInput {
/** Return an InputFormat that reads from specified table. */
public IMRTableInputFormat getTableInputFormat(TableDesc table);
+ /** Return a helper to participate in batch cubing merge job flow. */
+ public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg);
+
/**
* Utility that configures mapper to read from a table.
*/
@@ -66,4 +69,11 @@ public interface IMRInput {
/** Add step that does necessary clean up, like delete the intermediate flat table */
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
+
+ public interface IMRBatchMergeInputSide {
+
+ /** Add step that executes before merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 1a86329..14fdd93 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -68,6 +68,10 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
+ public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
+ }
+
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
index e69de29..8b13789 100644
--- a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
+++ b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
@@ -0,0 +1 @@
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/23407e3d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0c969f2..9ec0f02 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -65,6 +65,16 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(table.getIdentity());
}
+ @Override
+ public IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) {
+ return new IMRBatchMergeInputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ // doing nothing
+ }
+ };
+ }
+
public static class HiveTableInputFormat implements IMRTableInputFormat {
final String dbName;
final String tableName;
[3/6] kylin git commit: KYLIN-1726 add partition column in fact
distinct output
Posted by sh...@apache.org.
KYLIN-1726 add partition column in fact distinct output
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/76d8672c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/76d8672c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/76d8672c
Branch: refs/heads/master
Commit: 76d8672ca77ee662bece45e1b2428d42d9beccc6
Parents: 23407e3
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 8 10:16:53 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:23:55 2016 +0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kylin/cube/CubeManager.java | 8 ++++++++
.../localmeta/table/DEFAULT.STREAMING_TABLE.json | 1 -
2 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/76d8672c/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 0941d56..a456537 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -925,6 +925,14 @@ public class CubeManager implements IRealizationProvider {
factDictCols.add(col);
}
}
+
+ // add partition column in all case
+ if (cubeDesc.getModel().getPartitionDesc() != null) {
+ TblColRef partitionCol = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ if (factDictCols.contains(partitionCol) == false) {
+ factDictCols.add(partitionCol);
+ }
+ }
return factDictCols;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/76d8672c/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
index 5bcfa35..64f359b 100644
--- a/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
+++ b/examples/test_case_data/localmeta/table/DEFAULT.STREAMING_TABLE.json
@@ -1,5 +1,4 @@
{
-
"uuid": "e286e39e-40d7-44c2-8fa2-41b365123987",
"name": "STREAMING_TABLE",
"columns": [
[6/6] kylin git commit: KYLIN-1859 reorg imports
Posted by sh...@apache.org.
KYLIN-1859 reorg imports
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67891d2
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67891d2
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67891d2
Branch: refs/heads/master
Commit: c67891d26a5a937cc4850fc389ad395d1609b575
Parents: a35dc3c
Author: shaofengshi <sh...@apache.org>
Authored: Wed Aug 10 12:13:05 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 12:13:05 2016 +0800
----------------------------------------------------------------------
core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java | 2 +-
.../org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java | 1 -
.../src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java | 1 -
.../org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java | 1 -
.../kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java | 1 -
.../main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java | 1 -
.../java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java | 1 -
.../java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java | 1 -
.../main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java | 1 -
.../java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java | 1 -
10 files changed, 1 insertion(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4533ae6..2ebf5d3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -590,7 +590,7 @@ public class CubeManager implements IRealizationProvider {
if (existing.isEmpty()) {
return cube.getDescriptor().getPartitionDateStart();
} else {
- return existing.get(existing.size() - 1).getDateRangeStart();
+ return existing.get(existing.size() - 1).getDateRangeEnd();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index cc2bf7d..85e3cc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -46,7 +46,6 @@ import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 90dec84..0399300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -44,7 +44,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index a4b087b..a6c4d30 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -37,7 +37,6 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 5680004..a91d333 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index d50c8a5..f6ed8e7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -41,7 +41,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 75b6489..24c37ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -45,7 +45,6 @@ import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 906ccdc..a6bef83 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -51,7 +51,6 @@ import org.apache.kylin.measure.BufferedMeasureEncoder;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index fbd02ac..b566c2e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -38,7 +38,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kylin/blob/c67891d2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 5c45673..f83ad7d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -50,7 +50,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
[5/6] kylin git commit: KYLIN-1859 Use segment "uuid" instead of
"name" to seek a segment across the system
Posted by sh...@apache.org.
KYLIN-1859 Use segment "uuid" instead of "name" to seek a segment across the system
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a35dc3cb
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a35dc3cb
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a35dc3cb
Branch: refs/heads/master
Commit: a35dc3cb6b4e5f6a89001c146648200a15456443
Parents: 76d8672
Author: shaofengshi <sh...@apache.org>
Authored: Fri Jul 8 13:29:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Aug 10 10:31:42 2016 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/KafkaDataLoader.java | 2 +-
.../java/org/apache/kylin/cube/CubeManager.java | 23 ++++++++++++++++----
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 5 ++---
.../cube/model/CubeJoinedFlatTableDesc.java | 2 +-
.../kylin/engine/mr/BatchCubingJobBuilder.java | 4 ++--
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 6 ++---
.../kylin/engine/mr/BatchMergeJobBuilder.java | 2 +-
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 15 +++++++++++++
.../kylin/engine/mr/JobBuilderSupport.java | 4 ++--
.../engine/mr/common/AbstractHadoopJob.java | 2 ++
.../kylin/engine/mr/common/BatchConstants.java | 4 +++-
.../engine/mr/steps/BaseCuboidMapperBase.java | 6 ++---
.../engine/mr/steps/CreateDictionaryJob.java | 6 ++---
.../apache/kylin/engine/mr/steps/CuboidJob.java | 10 ++++-----
.../engine/mr/steps/FactDistinctColumnsJob.java | 14 ++++++------
.../mr/steps/FactDistinctColumnsMapperBase.java | 2 +-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 8 +++----
.../engine/mr/steps/InMemCuboidMapper.java | 4 ++--
.../kylin/engine/mr/steps/MergeCuboidJob.java | 6 ++---
.../engine/mr/steps/MergeCuboidMapper.java | 6 ++---
.../kylin/engine/mr/steps/NDCuboidMapper.java | 6 ++---
.../engine/mr/steps/NDCuboidMapperTest.java | 4 ++--
.../test_kylin_cube_with_slr_1_new_segment.json | 1 +
.../test_streaming_table_cube_desc.json | 2 +-
.../storage/hbase/steps/CreateHTableJob.java | 8 +++----
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 2 +-
26 files changed, 94 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
index 0eaae20..454f6cf 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -69,7 +69,7 @@ public class KafkaDataLoader extends StreamDataLoader {
List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
for (int i = 0; i < messages.size(); ++i) {
- KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
+ KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
keyedMessages.add(keyedMessage);
}
producer.send(keyedMessages);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index a456537..4533ae6 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -421,10 +421,14 @@ public class CubeManager implements IRealizationProvider {
// try figure out a reasonable start if missing
if (startDate == 0 && startOffset == 0) {
boolean isOffsetsOn = endOffset != 0;
- if (isOffsetsOn)
- startOffset = calculateStartDateForAppendSegment(cube);
- else
+ if (isOffsetsOn) {
+ startOffset = calculateStartOffsetForAppendSegment(cube);
+ if (startOffset == Long.MAX_VALUE) {
+ throw new IllegalStateException("There is already one pending for building segment, please submit request later.");
+ }
+ } else {
startDate = calculateStartDateForAppendSegment(cube);
+ }
}
} else {
startDate = 0;
@@ -570,12 +574,23 @@ public class CubeManager implements IRealizationProvider {
return max;
}
+
+ private long calculateStartOffsetForAppendSegment(CubeInstance cube) {
+ List<CubeSegment> existing = cube.getSegments();
+ if (existing.isEmpty()) {
+ return 0;
+ } else {
+ return existing.get(existing.size() - 1).getSourceOffsetEnd();
+ }
+ }
+
+
private long calculateStartDateForAppendSegment(CubeInstance cube) {
List<CubeSegment> existing = cube.getSegments();
if (existing.isEmpty()) {
return cube.getDescriptor().getPartitionDateStart();
} else {
- return existing.get(existing.size() - 1).getSourceOffsetEnd();
+ return existing.get(existing.size() - 1).getDateRangeStart();
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 3b65d1f..d3b0782 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -26,7 +26,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.DimensionDesc;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +34,9 @@ public class DictionaryGeneratorCLI {
private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
- public static void processSegment(KylinConfig config, String cubeName, String segmentName, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
+ public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ CubeSegment segment = cube.getSegmentById(segmentID);
processSegment(config, segment, factTableValueProvider);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index b24ef4d..04aea89 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -78,7 +78,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
if (cubeSegment == null) {
this.tableName = "kylin_intermediate_" + cubeDesc.getName();
} else {
- this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
+ this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getUuid();
}
int columnIndex = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 5a098a8..6c973eb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -94,7 +94,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
@@ -116,7 +116,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 93ae1e4..6eba3c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -110,7 +110,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
@@ -135,7 +135,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
@@ -161,7 +161,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index 0769b52..33b6f29 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -86,7 +86,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 10483eb..71a20a1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -88,6 +88,21 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() {
return MergeCuboidJob.class;
+// private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
+// MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+// mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+// StringBuilder cmd = new StringBuilder();
+//
+// appendMapReduceParameters(cmd);
+// appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+// appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+// appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+// appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+// appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+//
+// mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+// mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
+// return mergeCuboidDataStep;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 86451c9..159e5cb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -68,7 +68,7 @@ public class JobBuilderSupport {
appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId));
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId));
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
@@ -84,7 +84,7 @@ public class JobBuilderSupport {
buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
buildDictionaryStep.setJobParams(cmd.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 04ecc71..af2ed9f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -79,7 +79,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME);
protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME);
protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID);
+// @Deprecated
protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create(BatchConstants.ARG_SEGMENT_NAME);
+ protected static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName(BatchConstants.ARG_SEGMENT_ID).hasArg().isRequired(true).withDescription("Cube segment id").create(BatchConstants.ARG_SEGMENT_ID);
protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT);
protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 387e695..f0503a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -30,7 +30,8 @@ public interface BatchConstants {
*/
String CFG_CUBE_NAME = "cube.name";
- String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+// String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
+ String CFG_CUBE_SEGMENT_ID = "cube.segment.id";
String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
String CFG_II_NAME = "ii.name";
@@ -65,6 +66,7 @@ public interface BatchConstants {
String ARG_CUBE_NAME = "cubename";
String ARG_II_NAME = "iiname";
String ARG_SEGMENT_NAME = "segmentname";
+ String ARG_SEGMENT_ID = "segmentid";
String ARG_PARTITION = "partitions";
String ARG_STATS_ENABLED = "statisticsenabled";
String ARG_STATS_OUTPUT = "statisticsoutput";
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 10fbba3..cc2bf7d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -60,7 +60,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
public static final byte[] ONE = Bytes.toBytes("1");
protected String cubeName;
- protected String segmentName;
+ protected String segmentID;
protected Cuboid baseCuboid;
protected CubeInstance cube;
protected CubeDesc cubeDesc;
@@ -86,7 +86,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
super.bindCurrentConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
@@ -98,7 +98,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
cube = CubeManager.getInstance(config).getCube(cubeName);
cubeDesc = cube.getDescriptor();
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ cubeSegment = cube.getSegmentById(segmentID);
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 59233b9..69c0095 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -43,17 +43,17 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
parseOptions(options, args);
final String cubeName = getOptionValue(OPTION_CUBE_NAME);
- final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
KylinConfig config = KylinConfig.getInstanceFromEnv();
- DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
+ DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() {
@Override
public ReadableTable getDistinctValuesFor(TblColRef col) {
return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index f3524f8..90dec84 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -86,7 +86,7 @@ public class CuboidJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_NCUBOID_LEVEL);
@@ -97,14 +97,14 @@ public class CuboidJob extends AbstractHadoopJob {
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
if (checkSkip(cubingJobId)) {
- logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeName + "[" + segmentName + "]");
+ logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
return 0;
}
@@ -115,7 +115,7 @@ public class CuboidJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
// Mapper
- configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
+ configureMapperInputFormat(cube.getSegmentById(segmentID));
job.setMapperClass(this.mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
@@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f091ab9..a4b087b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -56,7 +56,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_CUBING_JOB_ID);
options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_STATISTICS_ENABLED);
options.addOption(OPTION_STATISTICS_OUTPUT);
options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
@@ -68,7 +68,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
String cubeName = getOptionValue(OPTION_CUBE_NAME);
Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
@@ -80,7 +80,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
@@ -88,10 +88,10 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
- CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ CubeSegment segment = cube.getSegmentById(segmentID);
if (segment == null) {
- logger.error("Failed to find {} in cube {}", segmentName, cube);
- System.out.println("Failed to find {} in cube {} " + segmentName + "," + cube);
+ logger.error("Failed to find {} in cube {}", segmentID, cube);
+ System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube);
for (CubeSegment s : cube.getSegments()) {
logger.error(s.getName() + " with status " + s.getStatus());
System.out.println(s.getName() + " with status " + s.getStatus());
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
logger.info("Found segment: " + segment);
System.out.println("Found segment " + segment);
}
- setupMapper(segment);
+ setupMapper(cube.getSegmentById(segmentID));
setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size());
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 35481fd..5680004 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -66,7 +66,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
+ cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
cubeDesc = cube.getDescriptor();
baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 510dbe8..d50c8a5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -75,18 +75,18 @@ public class InMemCuboidJob extends AbstractHadoopJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_CUBING_JOB_ID);
parseOptions(options, args);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
String output = getOptionValue(OPTION_OUTPUT_PATH);
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ CubeSegment cubeSeg = cube.getSegmentById(segmentID);
String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
if (checkSkip(cubingJobId)) {
@@ -105,7 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
// set input
IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 7baf5c5..75b6489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -77,8 +77,8 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
cube = CubeManager.getInstance(config).getCube(cubeName);
cubeDesc = cube.getDescriptor();
- String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+ cubeSegment = cube.getSegmentById(segmentID);
flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 5546bce..e0ae74d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -39,13 +39,13 @@ public class MergeCuboidJob extends CuboidJob {
try {
options.addOption(OPTION_JOB_NAME);
options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_OUTPUT_PATH);
parseOptions(options, args);
String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
@@ -76,7 +76,7 @@ public class MergeCuboidJob extends CuboidJob {
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5fd321c..906ccdc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -65,7 +65,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private KylinConfig config;
private String cubeName;
- private String segmentName;
+ private String segmentID;
private CubeManager cubeManager;
private CubeInstance cube;
private CubeDesc cubeDesc;
@@ -95,14 +95,14 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
super.bindCurrentConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+ segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
config = AbstractHadoopJob.loadKylinPropsAndMetadata();
cubeManager = CubeManager.getInstance(config);
cube = cubeManager.getCube(cubeName);
cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ mergedCubeSegment = cube.getSegmentById(segmentID);
// int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index d822134..fbd02ac 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -52,7 +52,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private Text outputKey = new Text();
private String cubeName;
- private String segmentName;
+ private String segmentID;
private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
private CuboidScheduler cuboidScheduler;
@@ -70,12 +70,12 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
super.bindCurrentConfiguration(context.getConfiguration());
cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+ segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ cubeSegment = cube.getSegmentById(segmentID);
cubeDesc = cube.getDescriptor();
// initialize CubiodScheduler
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
index caf87e2..daab3b1 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NDCuboidMapperTest.java
@@ -70,9 +70,9 @@ public class NDCuboidMapperTest extends LocalFileMetadataTestCase {
public void testMapReduceWithSlr() throws IOException {
String cubeName = "test_kylin_cube_with_slr_1_new_segment";
- String segmentName = "20130331080000_20131212080000";
+ String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ mapReduceDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
byte[] key = { 0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, -104, -106, -128, 11, 54, -105, 55, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 13, 71, 114, 65, 66, 73, 78, 9, 9, 9, 9, 9, 9, 9, 9, 0, 10, 0 };
byte[] value = { 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 14, 7, 23, -16, 56, 92, 114, -80, 118, 1, 1 };
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
index 7bb8078..8c48ffd 100644
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
+++ b/examples/test_case_data/localmeta/cube/test_kylin_cube_with_slr_1_new_segment.json
@@ -7,6 +7,7 @@
"descriptor" : "test_kylin_cube_with_slr_desc",
"cost" : 50,
"segments" : [ {
+ "uuid" : "198va32a-a33e-4b69-83dd-0bb8b1f8c53b",
"name" : "20130331080000_20131212080000",
"storage_location_identifier" : "KYLIN-CUBE-TEST_KYLIN_CUBE_WITH_SLR_READY-F24668F6-DCFF-4CB6-A89B-77F1119DF8FA",
"date_range_start" : 1364688000000,
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
index 23e5b00..ef10c1e 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json
@@ -106,7 +106,7 @@
}
} ],
"override_kylin_properties": {
- "kylin.cube.algorithm": "random"
+ "kylin.cube.algorithm": "inmem"
},
"notify_list" : [ ],
"status_need_notify" : [ ],
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index b93e0a1..5c45673 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -66,7 +66,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeInstance cube = null;
CubeDesc cubeDesc = null;
- String segmentName = null;
+ String segmentID = null;
KylinConfig kylinConfig;
Path partitionFilePath;
@@ -75,7 +75,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
Options options = new Options();
options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_STATISTICS_ENABLED);
parseOptions(options, args);
@@ -88,8 +88,8 @@ public class CreateHTableJob extends AbstractHadoopJob {
cube = cubeMgr.getCube(cubeName);
cubeDesc = cube.getDescriptor();
kylinConfig = cube.getConfig();
- segmentName = getOptionValue(OPTION_SEGMENT_NAME);
- CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ CubeSegment cubeSegment = cube.getSegmentById(segmentID);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
http://git-wip-us.apache.org/repos/asf/kylin/blob/a35dc3cb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 0914827..0679feb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -87,7 +87,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
- appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));