You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 08:18:44 UTC
[2/2] incubator-kylin git commit: StreamBuilder support parser and
filter cond.
StreamBuilder support parser and filter cond.
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/26e4ff7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/26e4ff7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/26e4ff7e
Branch: refs/heads/0.8.0
Commit: 26e4ff7e0a51822bd85249765aa158b57a2175d5
Parents: 8cebe49
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 14:14:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:18:34 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 10 +++++
.../kylin/job/hadoop/invertedindex/IITest.java | 44 ++++++++++++++++----
.../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++
3 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0c67d78..ee00880 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -166,6 +166,7 @@ public class StreamingBootstrap {
});
CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+ cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
future.get();
}
@@ -180,6 +181,15 @@ public class StreamingBootstrap {
}
}
+ private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
+ if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
+ Class clazz = Class.forName(kafkaConfig.getFilterName());
+ return (StreamFilter) clazz.newInstance();
+ } else {
+ return DefaultStreamFilter.instance;
+ }
+ }
+
private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 390017f..7915080 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -41,9 +41,7 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.SliceBuilder;
import org.junit.After;
import org.junit.Assert;
@@ -76,16 +74,48 @@ public class IITest extends LocalFileMetadataTestCase {
this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
this.iiDesc = ii.getDescriptor();
- List<List<String>> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, List<String>>() {
+ List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
@Nullable
@Override
- public List<String> apply(@Nullable String input) {
- return StringStreamParser.instance.parse(new StreamMessage(System.currentTimeMillis(), input.getBytes())).getStreamMessage();
+ public StreamMessage apply(String input) {
+ return new StreamMessage(System.currentTimeMillis(), input.getBytes());
}
});
+ List<List<String>> parsedStreamMessages = Lists.newArrayList();
+ StreamParser parser = StringStreamParser.instance;
+ StreamFilter filter = DefaultStreamFilter.instance;
+ long startOffset = Long.MAX_VALUE;
+ long endOffset = Long.MIN_VALUE;
+ long startTimestamp = Long.MAX_VALUE;
+ long endTimestamp = Long.MIN_VALUE;
+
+ for(StreamMessage message: streamMessages)
+ {
+ ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+ if(filter.filter(parsedStreamMessage))
+ {
+ if (startOffset > parsedStreamMessage.getOffset()) {
+ startOffset = parsedStreamMessage.getOffset();
+ }
+ if (endOffset < parsedStreamMessage.getOffset()) {
+ endOffset = parsedStreamMessage.getOffset();
+ }
+ if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+ startTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+ endTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+ }
+ }
+
+ MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
+
+
iiRows = Lists.newArrayList();
- final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice(new MicroStreamBatch(streamMessages, org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()), org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+ final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
for (IIRow iiRow : codec.encodeKeyValue(slice)) {
iiRows.add(iiRow);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b6f5025..f0f3b6f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -81,6 +81,18 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
+ @JsonProperty("filterName")
+ private String filterName;
+
+ public String getFilterName() {
+ return filterName;
+ }
+
+ public void setFilterName(String filterName) {
+ this.filterName = filterName;
+ }
+
+
public String getParserName() {
return parserName;
}