You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 08:18:43 UTC

[1/2] incubator-kylin git commit: KYLIN-801 use new ehcache conf

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 61c7115d4 -> 26e4ff7e0


KYLIN-801 use new ehcache conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8cebe49d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8cebe49d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8cebe49d

Branch: refs/heads/0.8.0
Commit: 8cebe49d7c2e726fc4dcbb42474a8f620a22ff5a
Parents: 61c7115
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 11:10:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:14:17 2015 +0800

----------------------------------------------------------------------
 .../test_kylin_cube_with_slr_desc.json          |  1 -
 ...test_kylin_cube_with_slr_left_join_desc.json |  1 -
 .../test_kylin_cube_without_slr_desc.json       |  1 -
 ...t_kylin_cube_without_slr_left_join_desc.json |  1 -
 .../kylin/job/streaming/CubeStreamBuilder.java  | 42 +++++++++-----------
 server/src/main/resources/ehcache-test.xml      | 15 ++++---
 server/src/main/resources/ehcache.xml           | 15 ++++---
 7 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
index fecb65b..eeaa049 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
@@ -150,7 +150,6 @@
     } ],
     "aggregation_groups" : [ [ "leaf_categ_id", "meta_categ_name", "categ_lvl2_name", "categ_lvl3_name", "cal_dt" ] ]
   },
-  "signature" : "lsLAl2jL62ZApmOLZqWU3g==",
   "last_modified" : 1422435345330,
   "model_name" : "test_kylin_inner_join_model_desc",
   "null_string" : null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
index e06ce27..720c87f 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
@@ -150,7 +150,6 @@
     } ],
     "aggregation_groups" : [ [ "leaf_categ_id", "meta_categ_name", "categ_lvl2_name", "categ_lvl3_name", "cal_dt" ] ]
   },
-  "signature" : "ljba0vaTnt00lU4rdhG9Xw==",
   "last_modified" : 1422435345352,
   "model_name" : "test_kylin_left_join_model_desc",
   "null_string" : null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 9534de4..4d75837 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -174,7 +174,6 @@
     } ],
     "aggregation_groups" : [ [ "lstg_format_name", "lstg_site_id", "slr_segment_cd" ], [ "leaf_categ_id", "meta_categ_name", "categ_lvl3_name", "categ_lvl2_name", "lstg_format_name" ] ]
   },
-  "signature" : "7qJiv2MEGoGf1AAgksQmZw==",
   "last_modified" : 1422435345362,
   "model_name" : "test_kylin_inner_join_model_desc",
   "null_string" : null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index 1e2fc9e..06fb7b8 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -174,7 +174,6 @@
     } ],
     "aggregation_groups" : [ [ "lstg_format_name", "lstg_site_id", "slr_segment_cd" ], [ "leaf_categ_id", "meta_categ_name", "categ_lvl3_name", "categ_lvl2_name", "lstg_format_name" ] ]
   },
-  "signature" : "TPdd/nnYLZnsI8TtZzoCng==",
   "last_modified" : 1422435345373,
   "model_name" : "test_kylin_left_join_model_desc",
   "null_string" : null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 79fd674..3986db6 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -1,21 +1,13 @@
 package org.apache.kylin.job.streaming;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,14 +54,14 @@ import org.apache.kylin.streaming.StreamMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  */
@@ -342,12 +334,14 @@ public class CubeStreamBuilder extends StreamBuilder {
         return result;
     }
 
+    //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
     private void commitSegment(CubeSegment cubeSegment) throws IOException {
         cubeSegment.setStatus(SegmentStatusEnum.READY);
 
         CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
         cube.getSegments().add(cubeSegment);
         Collections.sort(cube.getSegments());
+
         CubeManager.getInstance(kylinConfig).updateCube(cube, false);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/server/src/main/resources/ehcache-test.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache-test.xml b/server/src/main/resources/ehcache-test.xml
index 43940e0..8ee2246 100644
--- a/server/src/main/resources/ehcache-test.xml
+++ b/server/src/main/resources/ehcache-test.xml
@@ -1,23 +1,26 @@
 <ehcache maxBytesLocalHeap="10M">>
     <cache name="StorageCache"
            eternal="false"
-           overflowToDisk="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="1M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="ExceptionQueryCache"
            eternal="false"
-           overflowToDisk="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="1M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="UserCache"
            eternal="false"
-           overflowToDisk="false"
            timeToLiveSeconds="10800"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="1M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
 </ehcache>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/server/src/main/resources/ehcache.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache.xml b/server/src/main/resources/ehcache.xml
index ece1845..f8fc68d 100644
--- a/server/src/main/resources/ehcache.xml
+++ b/server/src/main/resources/ehcache.xml
@@ -1,23 +1,26 @@
 <ehcache maxBytesLocalHeap="2048M">
     <cache name="StorageCache"
            eternal="false"
-           overflowToDisk="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="500M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="ExceptionQueryCache"
            eternal="false"
-           overflowToDisk="false"
            timeToIdleSeconds="86400"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="100M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
     <cache name="UserCache"
            eternal="false"
-           overflowToDisk="false"
            timeToLiveSeconds="10800"
            memoryStoreEvictionPolicy="LRU"
            maxBytesLocalHeap="100M"
-            />
+            >
+        <persistence strategy="none"/>
+    </cache>
 </ehcache>
\ No newline at end of file


[2/2] incubator-kylin git commit: StreamBuilder support parser and filter cond.

Posted by ma...@apache.org.
StreamBuilder support parser and filter cond.


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/26e4ff7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/26e4ff7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/26e4ff7e

Branch: refs/heads/0.8.0
Commit: 26e4ff7e0a51822bd85249765aa158b57a2175d5
Parents: 8cebe49
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 14:14:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:18:34 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/StreamingBootstrap.java | 10 +++++
 .../kylin/job/hadoop/invertedindex/IITest.java  | 44 ++++++++++++++++----
 .../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++
 3 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0c67d78..ee00880 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -166,6 +166,7 @@ public class StreamingBootstrap {
         });
         CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
         cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+        cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         future.get();
     }
@@ -180,6 +181,15 @@ public class StreamingBootstrap {
         }
     }
 
+    private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
+        if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
+            Class clazz = Class.forName(kafkaConfig.getFilterName());
+            return (StreamFilter) clazz.newInstance();
+        } else {
+            return DefaultStreamFilter.instance;
+        }
+    }
+
     private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
         final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 390017f..7915080 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -41,9 +41,7 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.*;
 import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
@@ -76,16 +74,48 @@ public class IITest extends LocalFileMetadataTestCase {
         this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
         this.iiDesc = ii.getDescriptor();
 
-        List<List<String>> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, List<String>>() {
+        List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
             @Nullable
             @Override
-            public List<String> apply(@Nullable String input) {
-                return StringStreamParser.instance.parse(new StreamMessage(System.currentTimeMillis(), input.getBytes())).getStreamMessage();
+            public StreamMessage apply(String input) {
+                return new StreamMessage(System.currentTimeMillis(), input.getBytes());
             }
         });
 
+        List<List<String>> parsedStreamMessages = Lists.newArrayList();
+        StreamParser parser = StringStreamParser.instance;
+        StreamFilter filter = DefaultStreamFilter.instance;
+        long startOffset = Long.MAX_VALUE;
+        long endOffset = Long.MIN_VALUE;
+        long startTimestamp = Long.MAX_VALUE;
+        long endTimestamp = Long.MIN_VALUE;
+
+        for(StreamMessage message: streamMessages)
+        {
+            ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+            if(filter.filter(parsedStreamMessage))
+            {
+                if (startOffset > parsedStreamMessage.getOffset()) {
+                    startOffset = parsedStreamMessage.getOffset();
+                }
+                if (endOffset < parsedStreamMessage.getOffset()) {
+                    endOffset = parsedStreamMessage.getOffset();
+                }
+                if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+                    startTimestamp = parsedStreamMessage.getTimestamp();
+                }
+                if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+                    endTimestamp = parsedStreamMessage.getTimestamp();
+                }
+                parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+            }
+        }
+
+        MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
+
+
         iiRows = Lists.newArrayList();
-        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice(new MicroStreamBatch(streamMessages, org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()), org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+        final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
         IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
         for (IIRow iiRow : codec.encodeKeyValue(slice)) {
             iiRows.add(iiRow);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b6f5025..f0f3b6f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -81,6 +81,18 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("parserName")
     private String parserName;
 
+    @JsonProperty("filterName")
+    private String filterName;
+
+    public String getFilterName() {
+        return filterName;
+    }
+
+    public void setFilterName(String filterName) {
+        this.filterName = filterName;
+    }
+
+
     public String getParserName() {
         return parserName;
     }