You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/10 08:50:46 UTC
[43/50] [abbrv] kylin git commit: initial commit for KYLIN-1431
initial commit for KYLIN-1431
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9021f17d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9021f17d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9021f17d
Branch: refs/heads/master
Commit: 9021f17d85be01bf34b48a7a31be82f53ceb9c8f
Parents: 0ec3ed0
Author: shaofengshi <sh...@apache.org>
Authored: Wed Mar 2 11:16:46 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 8 15:07:04 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/CubeMetaExtractor.java | 2 +-
build/bin/streaming_build.sh | 4 +-
build/bin/streaming_fillgap.sh | 5 +-
.../kylin/metadata/model/ISourceAware.java | 1 +
.../kylin/engine/streaming/BootstrapConfig.java | 20 +-
.../kylin/engine/streaming/IStreamingInput.java | 3 +-
.../streaming/OneOffStreamingBuilder.java | 17 +-
.../kylin/engine/streaming/StreamingConfig.java | 33 +--
.../engine/streaming/StreamingManager.java | 12 +
.../engine/streaming/cli/StreamingCLI.java | 21 +-
.../engine/streaming/util/StreamingUtils.java | 18 +-
.../kafka/default.streaming_table.json | 21 ++
.../localmeta/kafka/kafka_test.json | 20 --
.../kafka/test_streaming_table_cube.json | 22 --
.../kafka/test_streaming_table_ii.json | 22 --
.../streaming/default.streaming_table.json | 6 +
.../localmeta/streaming/kafka_test.json | 20 --
.../streaming/test_streaming_table_cube.json | 8 -
.../streaming/test_streaming_table_ii.json | 8 -
.../kylin/provision/BuildCubeWithStream.java | 16 +-
.../kylin/rest/controller/CubeController.java | 234 -------------------
.../rest/controller/StreamingController.java | 4 +-
.../kylin/rest/service/StreamingService.java | 18 +-
.../kylin/source/kafka/KafkaStreamingInput.java | 78 ++++---
.../kylin/source/kafka/StreamingParser.java | 6 +-
25 files changed, 163 insertions(+), 456 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
index 527ef0a..ef27ade 100644
--- a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -227,7 +227,7 @@ public class CubeMetaExtractor extends AbstractApplication {
private void dealWithStreaming(CubeInstance cube) {
for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
- if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+ if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
index a96ecc1..ed19036 100644
--- a/build/bin/streaming_build.sh
+++ b/build/bin/streaming_build.sh
@@ -20,7 +20,7 @@
source /etc/profile
source ~/.bash_profile
-STREAMING=$1
+CUBE=$1
INTERVAL=$2
DELAY=$3
CURRENT_TIME_IN_SECOND=`date +%s`
@@ -30,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
ID="$START"_"$END"
echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
index 74d9037..c67809a 100644
--- a/build/bin/streaming_fillgap.sh
+++ b/build/bin/streaming_fillgap.sh
@@ -20,8 +20,7 @@
source /etc/profile
source ~/.bash_profile
-streaming=$1
-margin=$2
+cube=$1
cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${streaming} fillgap -streaming ${streaming} -fillGap true -margin ${margin}
\ No newline at end of file
+sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 3d89f40..8cfda15 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
public interface ISourceAware {
public static final int ID_HIVE = 0;
+ public static final int ID_STREAMING = 1;
public static final int ID_SPARKSQL = 5;
int getSourceType();
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index a3e2db5..a4c4618 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -4,9 +4,7 @@ package org.apache.kylin.engine.streaming;
*/
public class BootstrapConfig {
- private String streaming;
- private int partitionId = -1;
-
+ private String cubeName;
private long start = 0L;
private long end = 0L;
@@ -28,20 +26,12 @@ public class BootstrapConfig {
this.end = end;
}
- public String getStreaming() {
- return streaming;
- }
-
- public void setStreaming(String streaming) {
- this.streaming = streaming;
- }
-
- public int getPartitionId() {
- return partitionId;
+ public String getCubeName() {
+ return cubeName;
}
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
}
public boolean isFillGap() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
index 1cf3d98..4b4cf02 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
@@ -34,11 +34,12 @@
package org.apache.kylin.engine.streaming;
import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.metadata.realization.RealizationType;
/**
*/
public interface IStreamingInput {
- StreamingBatch getBatchWithTimeWindow(String streamingConfig, int id, long startTime, long endTime);
+ StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 3fbade2..6bad000 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -43,6 +43,7 @@ import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.realization.RealizationType;
/**
*/
@@ -53,23 +54,25 @@ public class OneOffStreamingBuilder {
private final StreamingBatchBuilder streamingBatchBuilder;
private final long startTime;
private final long endTime;
- private final String streamingConfig;
+ private final RealizationType realizationType;
+ private final String realizationName;
- public OneOffStreamingBuilder(String streamingConfig, long startTime, long endTime) {
+ public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
Preconditions.checkArgument(startTime < endTime);
this.startTime = startTime;
this.endTime = endTime;
- this.streamingConfig = Preconditions.checkNotNull(streamingConfig);
- this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput(streamingConfig));
- this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput(streamingConfig));
- this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(streamingConfig));
+ this.realizationType = Preconditions.checkNotNull(realizationType);
+ this.realizationName = Preconditions.checkNotNull(realizationName);
+ this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
+ this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
+ this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
}
public Runnable build() {
return new Runnable() {
@Override
public void run() {
- StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(streamingConfig, -1, startTime, endTime);
+ StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index f0a7ab1..c8d1911 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -55,29 +55,24 @@ public class StreamingConfig extends RootPersistentEntity {
public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+ public static final String STREAMING_TYPE_KAFKA = "kafka";
+
@JsonProperty("name")
private String name;
- @JsonProperty("iiName")
- private String iiName;
-
- @JsonProperty("cubeName")
- private String cubeName;
+ @JsonProperty("type")
+ private String type = STREAMING_TYPE_KAFKA;
- public String getCubeName() {
- return cubeName;
+ public String getType() {
+ return type;
}
- public void setCubeName(String cubeName) {
- this.cubeName = cubeName;
+ public void setType(String type) {
+ this.type = type;
}
- public String getIiName() {
- return iiName;
- }
-
- public void setIiName(String iiName) {
- this.iiName = iiName;
+ public String getResourcePath() {
+ return concatResourcePath(name);
}
public String getName() {
@@ -88,12 +83,8 @@ public class StreamingConfig extends RootPersistentEntity {
this.name = name;
}
- public String getResourcePath() {
- return concatResourcePath(name);
- }
-
- public static String concatResourcePath(String streamingName) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + streamingName + ".json";
+ public static String concatResourcePath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index e0b086d..f652762 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -104,6 +104,18 @@ public class StreamingManager {
}
}
+ private static String formatStreamingConfigPath(String name) {
+ return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, int partition) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+ }
+
+ private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+ return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+ }
+
public StreamingConfig getStreamingConfig(String name) {
return streamingMap.get(name);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a73a6ac..0bab396 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -44,6 +44,7 @@ import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.apache.kylin.metadata.realization.RealizationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class StreamingCLI {
public static void main(String[] args) {
try {
- Preconditions.checkArgument(args[0].equals("streaming"));
+ Preconditions.checkArgument(args[0].equals("cube"));
Preconditions.checkArgument(args[1].equals("start"));
int i = 2;
@@ -69,11 +70,8 @@ public class StreamingCLI {
case "-end":
bootstrapConfig.setEnd(Long.parseLong(args[++i]));
break;
- case "-streaming":
- bootstrapConfig.setStreaming(args[++i]);
- break;
- case "-partition":
- bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+ case "-cube":
+ bootstrapConfig.setCubeName(args[++i]);
break;
case "-fillGap":
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
@@ -84,14 +82,13 @@ public class StreamingCLI {
i++;
}
if (bootstrapConfig.isFillGap()) {
- final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
- final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+ final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(bootstrapConfig.getCubeName());
logger.info("all gaps:" + StringUtils.join(gaps, ","));
for (Pair<Long, Long> gap : gaps) {
- startOneOffCubeStreaming(bootstrapConfig.getStreaming(), gap.getFirst(), gap.getSecond());
+ startOneOffCubeStreaming(bootstrapConfig.getCubeName(), gap.getFirst(), gap.getSecond());
}
} else {
- startOneOffCubeStreaming(bootstrapConfig.getStreaming(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+ startOneOffCubeStreaming(bootstrapConfig.getCubeName(), bootstrapConfig.getStart(), bootstrapConfig.getEnd());
logger.info("streaming process finished, exit with 0");
System.exit(0);
}
@@ -102,8 +99,8 @@ public class StreamingCLI {
}
}
- private static void startOneOffCubeStreaming(String streaming, long start, long end) {
- final Runnable runnable = new OneOffStreamingBuilder(streaming, start, end).build();
+ private static void startOneOffCubeStreaming(String cubeName, long start, long end) {
+ final Runnable runnable = new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, end).build();
runnable.run();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 0ae7143..66a0af2 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -43,29 +43,27 @@ import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
import com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.realization.RealizationType;
/**
* TODO: like MRUtil, use Factory pattern to allow config
*/
public class StreamingUtils {
- public static IStreamingInput getStreamingInput(String streaming) {
+ public static IStreamingInput getStreamingInput() {
return (IStreamingInput) ClassUtil.newInstance("org.apache.kylin.source.kafka.KafkaStreamingInput");
}
- public static IStreamingOutput getStreamingOutput(String streaming) {
+ public static IStreamingOutput getStreamingOutput() {
return (IStreamingOutput) ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
}
- public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) {
- final StreamingConfig streamingConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streaming);
- Preconditions.checkNotNull(streamingConfig);
- if (streamingConfig.getCubeName() != null) {
- return new StreamingCubeBuilder(streamingConfig.getCubeName());
- } else if (streamingConfig.getIiName() != null) {
- throw new UnsupportedOperationException("not implemented yet");
+ public static StreamingBatchBuilder getMicroBatchBuilder(RealizationType realizationType, String realizationName) {
+ Preconditions.checkNotNull(realizationName);
+ if (realizationType == RealizationType.CUBE) {
+ return new StreamingCubeBuilder(realizationName);
} else {
- throw new UnsupportedOperationException("StreamingConfig is not valid");
+ throw new UnsupportedOperationException("not implemented yet");
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/default.streaming_table.json b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
new file mode 100644
index 0000000..c99b8e5
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/default.streaming_table.json
@@ -0,0 +1,21 @@
+{
+ "version":"2.1",
+ "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+ "name": "default.streaming_table",
+ "topic": "test_streaming_table_topic_xyz",
+ "timeout": 60000,
+ "bufferSize": 65536,
+ "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+ "last_modified": 0,
+ "clusters": [
+ {
+ "brokers": [
+ {
+ "id": 0,
+ "host": "sandbox",
+ "port": 6667
+ }
+ ]
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/kafka_test.json b/examples/test_case_data/localmeta/kafka/kafka_test.json
deleted file mode 100644
index a20f71e..0000000
--- a/examples/test_case_data/localmeta/kafka/kafka_test.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
- "name": "kafka_test",
- "topic": "kafka_stream_test",
- "timeout": 60000,
- "bufferSize": 65536,
- "last_modified": 0,
- "clusters": [
- {
- "brokers": [
- {
- "id": 0,
- "host": "sandbox.hortonworks.com",
- "port": 6667
- }
- ]
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
deleted file mode 100644
index 554fa62..0000000
--- a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "test_streaming_table_cube",
- "topic": "test_streaming_table_topic_xyz",
- "timeout": 60000,
- "bufferSize": 65536,
- "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
- "partition": 1,
- "last_modified": 0,
- "clusters": [
- {
- "brokers": [
- {
- "id": 0,
- "host": "sandbox",
- "port": 6667
- }
- ]
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
deleted file mode 100644
index b6f18c7..0000000
--- a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
- "name": "test_streaming_table_ii",
- "topic": "test_streaming_table_topic_xyz",
- "timeout": 60000,
- "bufferSize": 65536,
- "parserName": "org.apache.kylin.source.kafka.JsonStreamParser",
- "partition": 1,
- "last_modified": 0,
- "clusters": [
- {
- "brokers": [
- {
- "id": 0,
- "host": "sandbox",
- "port": 6667
- }
- ]
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/default.streaming_table.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/default.streaming_table.json b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
new file mode 100644
index 0000000..6eb4a88
--- /dev/null
+++ b/examples/test_case_data/localmeta/streaming/default.streaming_table.json
@@ -0,0 +1,6 @@
+{
+ "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+ "name": "default.streaming_table",
+ "type": "kafka",
+ "last_modified": 0
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/kafka_test.json b/examples/test_case_data/localmeta/streaming/kafka_test.json
deleted file mode 100644
index a20f71e..0000000
--- a/examples/test_case_data/localmeta/streaming/kafka_test.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
- "name": "kafka_test",
- "topic": "kafka_stream_test",
- "timeout": 60000,
- "bufferSize": 65536,
- "last_modified": 0,
- "clusters": [
- {
- "brokers": [
- {
- "id": 0,
- "host": "sandbox.hortonworks.com",
- "port": 6667
- }
- ]
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
deleted file mode 100644
index ecf0511..0000000
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
- "name": "test_streaming_table_cube",
- "cubeName": "test_streaming_table_cube",
- "partition": 1,
- "last_modified": 0
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
deleted file mode 100644
index 022ab70..0000000
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{
- "version":"2.1",
- "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
- "name": "test_streaming_table_ii",
- "iiName": "test_streaming_table_ii",
- "partition": 1,
- "last_modified": 0
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index 27226e7..eeff999 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -27,10 +27,13 @@ import org.apache.kylin.common.util.AbstractKylinTestCase;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.DeployUtil;
+import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
@@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class BuildCubeWithStream {
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class);
- private static final String streamingName = "test_streaming_table_cube";
+ private static final String cubeName = "test_streaming_table_cube";
private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00");
private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00");
private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours
@@ -75,15 +78,16 @@ public class BuildCubeWithStream {
DeployUtil.overrideJobJarLocations();
kylinConfig = KylinConfig.getInstanceFromEnv();
-
- final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final String factTable = cubeInstance.getFactTable();
+ final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable);
//Use a random topic for kafka data stream
- KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName);
+ KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName());
streamingConfig.setTopic(UUID.randomUUID().toString());
KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig);
- DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, config.getCubeName(), streamingConfig);
+ DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, streamingConfig);
}
public static void afterClass() throws Exception {
@@ -94,7 +98,7 @@ public class BuildCubeWithStream {
logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval));
for (long start = startTime; start < endTime; start += batchInterval) {
logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval));
- new OneOffStreamingBuilder(streamingName, start, start + batchInterval).build().run();
+ new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run();
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 9afa750..e60f330 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -284,92 +284,6 @@ public class CubeController extends BasicController {
throw new InternalErrorException("Failed to clone cube ", e);
}
- boolean isStreamingCube = false, cloneStreamingConfigSuccess = false, cloneKafkaConfigSuccess = false;
-
-
- List<StreamingConfig> streamingConfigs = null;
- try {
- streamingConfigs = streamingService.listAllStreamingConfigs(cubeName);
- if (streamingConfigs.size() != 0) {
- isStreamingCube = true;
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- StreamingConfig newStreamingConfig = null;
- KafkaConfig newKafkaConfig = null;
-
- try {
-
- if (isStreamingCube) {
-
- isStreamingCube = true;
- newStreamingConfig = streamingConfigs.get(0).clone();
- newStreamingConfig.setName(newCubeName + "_STREAMING");
- newStreamingConfig.updateRandomUuid();
- newStreamingConfig.setLastModified(0);
- newStreamingConfig.setCubeName(newCubeName);
- try {
- streamingService.createStreamingConfig(newStreamingConfig);
- cloneStreamingConfigSuccess = true;
- } catch (IOException e) {
- throw new InternalErrorException("Failed to clone streaming config. ", e);
- }
-
- //StreamingConfig name and KafkaConfig name is the same for same cube
- String kafkaConfigName = streamingConfigs.get(0).getName();
- KafkaConfig kafkaConfig = null;
- try {
- kafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfigName);
- if (kafkaConfig != null) {
- newKafkaConfig = kafkaConfig.clone();
- newKafkaConfig.setName(newStreamingConfig.getName());
- newKafkaConfig.setLastModified(0);
- newKafkaConfig.updateRandomUuid();
- }
- } catch (IOException e) {
- throw new InternalErrorException("Failed to get kafka config info. ", e);
- }
-
- try {
- kafkaConfigService.createKafkaConfig(newKafkaConfig);
- cloneKafkaConfigSuccess = true;
- } catch (IOException e) {
- throw new InternalErrorException("Failed to clone streaming config. ", e);
- }
- }
- } finally {
-
- //rollback if failed
- if (isStreamingCube) {
- if (cloneStreamingConfigSuccess == false || cloneKafkaConfigSuccess == false) {
- try {
- cubeService.deleteCube(newCube);
- } catch (Exception ex) {
- throw new InternalErrorException("Failed, and failed to rollback on delete cube. " + " Caused by: " + ex.getMessage(), ex);
- }
- if (cloneStreamingConfigSuccess == true) {
- try {
- streamingService.dropStreamingConfig(newStreamingConfig);
- } catch (IOException e) {
- throw new InternalErrorException("Failed to clone cube, and StreamingConfig created and failed to delete: " + e.getLocalizedMessage());
- }
- }
- if (cloneKafkaConfigSuccess == true) {
- try {
- kafkaConfigService.dropKafkaConfig(newKafkaConfig);
- } catch (IOException e) {
- throw new InternalErrorException("Failed to clone cube, and KafkaConfig created and failed to delete: " + e.getLocalizedMessage());
- }
- }
-
- }
-
- }
- }
-
return newCube;
}
@@ -400,27 +314,6 @@ public class CubeController extends BasicController {
throw new NotFoundException("Cube with name " + cubeName + " not found..");
}
- //drop related StreamingConfig KafkaConfig if exist
- try {
- List<StreamingConfig> configs = streamingService.listAllStreamingConfigs(cubeName);
- for (StreamingConfig config : configs) {
- try {
- streamingService.dropStreamingConfig(config);
- } catch (IOException e) {
- logger.error(e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to delete StreamingConfig. " + " Caused by: " + e.getMessage(), e);
- }
- try {
- KafkaConfig kfkConfig = kafkaConfigService.getKafkaConfig(config.getName());
- kafkaConfigService.dropKafkaConfig(kfkConfig);
- } catch (IOException e) {
- throw new InternalErrorException("Failed to delete KafkaConfig. " + " Caused by: " + e.getMessage(), e);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
//drop Cube
try {
cubeService.deleteCube(cube);
@@ -587,133 +480,6 @@ public class CubeController extends BasicController {
return cubeRequest;
}
- boolean updateStreamingConfigSuccess = false, updateKafkaConfigSuccess = false;
-
- boolean isStreamingCube = cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true");
-
- //oldConfig is for recover use
- StreamingConfig streamingConfig = null, oldStreamingConfig = null;
- KafkaConfig kafkaConfig = null, oldKafkaConfig = null;
- if (isStreamingCube) {
- streamingConfig = deserializeStreamingDesc(cubeRequest);
- kafkaConfig = deserializeKafkaDesc(cubeRequest);
- try {
- oldKafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName());
- } catch (IOException e) {
- e.printStackTrace();
- }
- oldStreamingConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName());
- }
- try {
- //streaming Cube
- if (isStreamingCube) {
- if (streamingConfig == null) {
- cubeRequest.setMessage("No StreamingConfig info to update.");
- return cubeRequest;
- }
- if (kafkaConfig == null) {
- cubeRequest.setMessage("No KafkaConfig info to update.");
- return cubeRequest;
- }
-
- if (oldStreamingConfig == null) {
- streamingConfig.setUuid(UUID.randomUUID().toString());
- try {
- streamingService.createStreamingConfig(streamingConfig);
- updateStreamingConfigSuccess = true;
- } catch (IOException e) {
- logger.error("Failed to add StreamingConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to add StreamingConfig: " + e.getLocalizedMessage());
- }
- } else {
- try {
- streamingConfig = streamingService.updateStreamingConfig(streamingConfig);
- updateStreamingConfigSuccess = true;
-
- } catch (IOException e) {
- logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage());
- }
- }
- if (oldKafkaConfig == null) {
- kafkaConfig.setUuid(UUID.randomUUID().toString());
- try {
- kafkaConfigService.createKafkaConfig(kafkaConfig);
- updateKafkaConfigSuccess = true;
- } catch (IOException e) {
- logger.error("Failed to add KafkaConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to add KafkaConfig: " + e.getLocalizedMessage());
- }
-
- } else {
- try {
- kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig);
- updateKafkaConfigSuccess = true;
- } catch (IOException e) {
- logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage());
- }
- }
-
- }
- } finally {
- if (isStreamingCube) {
- //recover cube desc
- if (updateStreamingConfigSuccess == false || updateKafkaConfigSuccess == false) {
- oldCubeDesc.setLastModified(desc.getLastModified());
- CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName());
- try {
- desc = cubeService.updateCubeAndDesc(cube, oldCubeDesc, projectName);
- } catch (Exception e) {
- logger.error("Failed to recover CubeDesc:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to recover CubeDesc: " + e.getLocalizedMessage());
- }
-
- if (updateStreamingConfigSuccess == true) {
-
- if (oldStreamingConfig != null) {
-
- oldStreamingConfig.setLastModified(streamingConfig.getLastModified());
- try {
- streamingService.updateStreamingConfig(oldStreamingConfig);
- } catch (IOException e) {
- logger.error("Failed to recover StreamingConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to recover StreamingConfig: " + e.getLocalizedMessage());
- }
- } else {
- try {
- streamingService.dropStreamingConfig(streamingConfig);
- } catch (IOException e) {
- logger.error("Failed to remove added StreamingConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to remove added StreamingConfig: " + e.getLocalizedMessage());
- }
- }
- }
-
- if (updateKafkaConfigSuccess == true) {
- if (oldKafkaConfig != null) {
- oldKafkaConfig.setLastModified(kafkaConfig.getLastModified());
- try {
- kafkaConfigService.updateKafkaConfig(oldKafkaConfig);
- } catch (IOException e) {
- logger.error("Failed to recover KafkaConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to recover KafkaConfig: " + e.getLocalizedMessage());
- }
- } else {
- try {
- kafkaConfigService.dropKafkaConfig(kafkaConfig);
- } catch (IOException e) {
- logger.error("Failed to remove added KafkaConfig:" + e.getLocalizedMessage(), e);
- throw new InternalErrorException("Failed to remove added KafkaConfig: " + e.getLocalizedMessage());
- }
- }
- }
-
- }
- }
-
- }
-
String descData = JsonUtil.writeValueAsIndentString(desc);
cubeRequest.setCubeDescData(descData);
cubeRequest.setSuccessful(true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e22bd30..ecd7571 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -60,9 +60,9 @@ public class StreamingController extends BasicController {
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
@ResponseBody
- public List<StreamingConfig> getStreamings(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
+ public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String table, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) {
try {
- return streamingService.getStreamingConfigs(cubeName, limit, offset);
+ return streamingService.getStreamingConfigs(table, limit, offset);
} catch (IOException e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
index e40426b..a0473e9 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/StreamingService.java
@@ -18,6 +18,8 @@
package org.apache.kylin.rest.service;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.rest.constant.Constant;
@@ -37,26 +39,22 @@ public class StreamingService extends BasicService {
private AccessService accessService;
@PostFilter(Constant.ACCESS_POST_FILTER_READ)
- public List<StreamingConfig> listAllStreamingConfigs(final String cubeName) throws IOException {
+ public List<StreamingConfig> listAllStreamingConfigs(final String table) throws IOException {
List<StreamingConfig> streamingConfigs = new ArrayList();
- CubeInstance cubeInstance = (null != cubeName) ? getCubeManager().getCube(cubeName) : null;
- if (null == cubeInstance) {
+ if (StringUtils.isEmpty(table)) {
streamingConfigs = getStreamingManager().listAllStreaming();
} else {
- for(StreamingConfig config : getStreamingManager().listAllStreaming()){
- if(cubeInstance.getName().equals(config.getCubeName())){
- streamingConfigs.add(config);
- }
- }
+ StreamingConfig config = getStreamingManager().getConfig(table);
+ streamingConfigs.add(config);
}
return streamingConfigs;
}
- public List<StreamingConfig> getStreamingConfigs(final String cubeName, final Integer limit, final Integer offset) throws IOException {
+ public List<StreamingConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset) throws IOException {
List<StreamingConfig> streamingConfigs;
- streamingConfigs = listAllStreamingConfigs(cubeName);
+ streamingConfigs = listAllStreamingConfigs(table);
if (limit == null || offset == null) {
return streamingConfigs;
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 2e262b3..c05119f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -47,9 +47,14 @@ import kafka.message.MessageAndOffset;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.streaming.IStreamingInput;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaRequester;
@@ -65,39 +70,54 @@ public class KafkaStreamingInput implements IStreamingInput {
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
@Override
- public StreamingBatch getBatchWithTimeWindow(String streaming, int id, long startTime, long endTime) {
- try {
+ public StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime) {
+ if (realizationType != RealizationType.CUBE) {
+ throw new IllegalArgumentException("Unsupported realization in KafkaStreamingInput: " + realizationType);
+ }
+ final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(realizationName);
+ final String streaming = cube.getFactTable();
+ final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+ final StreamingConfig streamingConfig = streamingManager.getConfig(streaming);
+ if (streamingConfig == null) {
+ throw new IllegalArgumentException("Table " + streaming + " is not a streaming table.");
+ }
+ if (StreamingConfig.STREAMING_TYPE_KAFKA.equals(streamingConfig.getType())) {
logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
- final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
- final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig);
- final ExecutorService executorService = Executors.newCachedThreadPool();
- final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
- for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
- final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
- for (int i = 0; i < partitionCount; ++i) {
- final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
- final Future<List<StreamingMessage>> future = executorService.submit(producer);
- futures.add(future);
+
+ try {
+ final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+ final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
+ final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName);
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
+ for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+ final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+ for (int i = 0; i < partitionCount; ++i) {
+ final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
+ final Future<List<StreamingMessage>> future = executorService.submit(producer);
+ futures.add(future);
+ }
}
- }
- List<StreamingMessage> messages = Lists.newLinkedList();
- for (Future<List<StreamingMessage>> future : futures) {
- try {
- messages.addAll(future.get());
- } catch (InterruptedException e) {
- logger.warn("this thread should not be interrupted, just ignore", e);
- continue;
- } catch (ExecutionException e) {
- throw new RuntimeException("error when get StreamingMessages",e.getCause());
+ List<StreamingMessage> messages = Lists.newLinkedList();
+ for (Future<List<StreamingMessage>> future : futures) {
+ try {
+ messages.addAll(future.get());
+ } catch (InterruptedException e) {
+ logger.warn("this thread should not be interrupted, just ignore", e);
+ continue;
+ } catch (ExecutionException e) {
+ throw new RuntimeException("error when get StreamingMessages", e.getCause());
+ }
}
+ final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
+ logger.info("finish to get streaming batch, total message count:" + messages.size());
+ return new StreamingBatch(messages, timeRange);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("failed to create instance of StreamingParser", e);
}
- final Pair<Long, Long> timeRange = Pair.newPair(startTime, endTime);
- logger.info("finish to get streaming batch, total message count:" + messages.size());
- return new StreamingBatch(messages, timeRange);
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("failed to create instance of StreamingParser", e);
+ } else {
+ throw new IllegalArgumentException("kafka is the only supported streaming type.");
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/9021f17d/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 3455f1d..7b326e2 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -50,6 +50,7 @@ import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import com.google.common.base.Function;
@@ -68,9 +69,8 @@ public abstract class StreamingParser {
abstract public boolean filter(StreamingMessage streamingMessage);
- public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig) throws ReflectiveOperationException {
- final String cubeName = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(kafkaConfig.getName()).getCubeName();
- final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+ public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException {
+ final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName);
List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
@Nullable
@Override