You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 08:18:44 UTC

[2/2] incubator-kylin git commit: StreamBuilder support parser and filter cond.

StreamBuilder support parser and filter cond.


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/26e4ff7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/26e4ff7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/26e4ff7e

Branch: refs/heads/0.8.0
Commit: 26e4ff7e0a51822bd85249765aa158b57a2175d5
Parents: 8cebe49
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 14:14:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:18:34 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 10 +++++
 .../kylin/job/hadoop/invertedindex/IITest.java  | 44 ++++++++++++++++----
 .../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++
 3 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0c67d78..ee00880 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -166,6 +166,7 @@ public class StreamingBootstrap {
         });
         CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
         cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+        cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         future.get();
     }
@@ -180,6 +181,15 @@ public class StreamingBootstrap {
         }
     }
 
+    private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
+        if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
+            Class clazz = Class.forName(kafkaConfig.getFilterName());
+            return (StreamFilter) clazz.newInstance();
+        } else {
+            return DefaultStreamFilter.instance;
+        }
+    }
+
     private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
         final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 390017f..7915080 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -41,9 +41,7 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
@@ -76,16 +74,48 @@ public class IITest extends LocalFileMetadataTestCase {
         this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
         this.iiDesc = ii.getDescriptor();
 
-        List<List<String>> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, List<String>>() {
+        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
             @Nullable
             @Override
-            public List<String> apply(@Nullable String input) {
-                return StringStreamParser.instance.parse(new StreamMessage(System.currentTimeMillis(), input.getBytes())).getStreamMessage();
+            public StreamMessage apply(String input) {
+                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
             }
         });
 
+        List<List<String>> parsedStreamMessages = Lists.newArrayList();
+        StreamParser parser = StringStreamParser.instance;
+        StreamFilter filter = DefaultStreamFilter.instance;
+        long startOffset = Long.MAX_VALUE;
+        long endOffset = Long.MIN_VALUE;
+        long startTimestamp = Long.MAX_VALUE;
+        long endTimestamp = Long.MIN_VALUE;
+
+        for(StreamMessage message: streamMessages)
+        {
+            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+            if(filter.filter(parsedStreamMessage))
+            {
+                if (startOffset > parsedStreamMessage.getOffset()) {
+                    startOffset = parsedStreamMessage.getOffset();
+                }
+                if (endOffset < parsedStreamMessage.getOffset()) {
+                    endOffset = parsedStreamMessage.getOffset();
+                }
+                if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+                    startTimestamp = parsedStreamMessage.getTimestamp();
+                }
+                if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+                    endTimestamp = parsedStreamMessage.getTimestamp();
+                }
+                parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+            }
+        }
+
+        MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
+
+
         iiRows = Lists.newArrayList();
-        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice(new MicroStreamBatch(streamMessages, org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()), org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
         IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
         for (IIRow iiRow : codec.encodeKeyValue(slice)) {
             iiRows.add(iiRow);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b6f5025..f0f3b6f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -81,6 +81,18 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
 
+    @JsonProperty("filterName")
+    private String filterName;
+
+    public String getFilterName() {
+        return filterName;
+    }
+
+    public void setFilterName(String filterName) {
+        this.filterName = filterName;
+    }
+
+
     public String getParserName() {
         return parserName;
     }