You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/06/02 08:18:43 UTC
[1/2] incubator-kylin git commit: KYLIN-801 use new ehcache conf
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 61c7115d4 -> 26e4ff7e0
KYLIN-801 use new ehcache conf
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8cebe49d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8cebe49d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8cebe49d
Branch: refs/heads/0.8.0
Commit: 8cebe49d7c2e726fc4dcbb42474a8f620a22ff5a
Parents: 61c7115
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 11:10:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:14:17 2015 +0800
----------------------------------------------------------------------
.../test_kylin_cube_with_slr_desc.json | 1 -
...test_kylin_cube_with_slr_left_join_desc.json | 1 -
.../test_kylin_cube_without_slr_desc.json | 1 -
...t_kylin_cube_without_slr_left_join_desc.json | 1 -
.../kylin/job/streaming/CubeStreamBuilder.java | 42 +++++++++-----------
server/src/main/resources/ehcache-test.xml | 15 ++++---
server/src/main/resources/ehcache.xml | 15 ++++---
7 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
index fecb65b..eeaa049 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
@@ -150,7 +150,6 @@
} ],
"aggregation_groups" : [ [ "leaf_categ_id", "meta_categ_name", "categ_lvl2_name", "categ_lvl3_name", "cal_dt" ] ]
},
- "signature" : "lsLAl2jL62ZApmOLZqWU3g==",
"last_modified" : 1422435345330,
"model_name" : "test_kylin_inner_join_model_desc",
"null_string" : null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
index e06ce27..720c87f 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_left_join_desc.json
@@ -150,7 +150,6 @@
} ],
"aggregation_groups" : [ [ "leaf_categ_id", "meta_categ_name", "categ_lvl2_name", "categ_lvl3_name", "cal_dt" ] ]
},
- "signature" : "ljba0vaTnt00lU4rdhG9Xw==",
"last_modified" : 1422435345352,
"model_name" : "test_kylin_left_join_model_desc",
"null_string" : null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 9534de4..4d75837 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -174,7 +174,6 @@
} ],
"aggregation_groups" : [ [ "lstg_format_name", "lstg_site_id", "slr_segment_cd" ], [ "leaf_categ_id", "meta_categ_name", "categ_lvl3_name", "categ_lvl2_name", "lstg_format_name" ] ]
},
- "signature" : "7qJiv2MEGoGf1AAgksQmZw==",
"last_modified" : 1422435345362,
"model_name" : "test_kylin_inner_join_model_desc",
"null_string" : null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index 1e2fc9e..06fb7b8 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -174,7 +174,6 @@
} ],
"aggregation_groups" : [ [ "lstg_format_name", "lstg_site_id", "slr_segment_cd" ], [ "leaf_categ_id", "meta_categ_name", "categ_lvl3_name", "categ_lvl2_name", "lstg_format_name" ] ]
},
- "signature" : "TPdd/nnYLZnsI8TtZzoCng==",
"last_modified" : 1422435345373,
"model_name" : "test_kylin_left_join_model_desc",
"null_string" : null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 79fd674..3986db6 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -1,21 +1,13 @@
package org.apache.kylin.job.streaming;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -62,14 +54,14 @@ import org.apache.kylin.streaming.StreamMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
/**
*/
@@ -342,12 +334,14 @@ public class CubeStreamBuilder extends StreamBuilder {
return result;
}
+ //TODO: should we use cubeManager.promoteNewlyBuiltSegments?
private void commitSegment(CubeSegment cubeSegment) throws IOException {
cubeSegment.setStatus(SegmentStatusEnum.READY);
CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
cube.getSegments().add(cubeSegment);
Collections.sort(cube.getSegments());
+
CubeManager.getInstance(kylinConfig).updateCube(cube, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/server/src/main/resources/ehcache-test.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache-test.xml b/server/src/main/resources/ehcache-test.xml
index 43940e0..8ee2246 100644
--- a/server/src/main/resources/ehcache-test.xml
+++ b/server/src/main/resources/ehcache-test.xml
@@ -1,23 +1,26 @@
<ehcache maxBytesLocalHeap="10M">>
<cache name="StorageCache"
eternal="false"
- overflowToDisk="false"
timeToIdleSeconds="86400"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="1M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
<cache name="ExceptionQueryCache"
eternal="false"
- overflowToDisk="false"
timeToIdleSeconds="86400"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="1M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
<cache name="UserCache"
eternal="false"
- overflowToDisk="false"
timeToLiveSeconds="10800"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="1M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
</ehcache>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8cebe49d/server/src/main/resources/ehcache.xml
----------------------------------------------------------------------
diff --git a/server/src/main/resources/ehcache.xml b/server/src/main/resources/ehcache.xml
index ece1845..f8fc68d 100644
--- a/server/src/main/resources/ehcache.xml
+++ b/server/src/main/resources/ehcache.xml
@@ -1,23 +1,26 @@
<ehcache maxBytesLocalHeap="2048M">
<cache name="StorageCache"
eternal="false"
- overflowToDisk="false"
timeToIdleSeconds="86400"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="500M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
<cache name="ExceptionQueryCache"
eternal="false"
- overflowToDisk="false"
timeToIdleSeconds="86400"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="100M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
<cache name="UserCache"
eternal="false"
- overflowToDisk="false"
timeToLiveSeconds="10800"
memoryStoreEvictionPolicy="LRU"
maxBytesLocalHeap="100M"
- />
+ >
+ <persistence strategy="none"/>
+ </cache>
</ehcache>
\ No newline at end of file
[2/2] incubator-kylin git commit: StreamBuilder support parser and
filter cond.
Posted by ma...@apache.org.
StreamBuilder support parser and filter cond.
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/26e4ff7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/26e4ff7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/26e4ff7e
Branch: refs/heads/0.8.0
Commit: 26e4ff7e0a51822bd85249765aa158b57a2175d5
Parents: 8cebe49
Author: honma <ho...@ebay.com>
Authored: Tue Jun 2 14:14:05 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Tue Jun 2 14:18:34 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/StreamingBootstrap.java | 10 +++++
.../kylin/job/hadoop/invertedindex/IITest.java | 44 ++++++++++++++++----
.../org/apache/kylin/streaming/KafkaConfig.java | 12 ++++++
3 files changed, 59 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 0c67d78..ee00880 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -166,6 +166,7 @@ public class StreamingBootstrap {
});
CubeStreamBuilder cubeStreamBuilder = new CubeStreamBuilder(streamQueue, cubeName);
cubeStreamBuilder.setStreamParser(getStreamParser(kafkaConfig, cubeInstance.getAllColumns()));
+ cubeStreamBuilder.setStreamFilter(getStreamFilter(kafkaConfig));
final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
future.get();
}
@@ -180,6 +181,15 @@ public class StreamingBootstrap {
}
}
+ private StreamFilter getStreamFilter(KafkaConfig kafkaConfig) throws Exception {
+ if (!StringUtils.isEmpty(kafkaConfig.getFilterName())) {
+ Class clazz = Class.forName(kafkaConfig.getFilterName());
+ return (StreamFilter) clazz.newInstance();
+ } else {
+ return DefaultStreamFilter.instance;
+ }
+ }
+
private void startIIStreaming(KafkaConfig kafkaConfig, final int partitionId, final int partitionCount) throws Exception {
final IIInstance ii = IIManager.getInstance(this.kylinConfig).getII(kafkaConfig.getIiName());
Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 390017f..7915080 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -41,9 +41,7 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.MicroStreamBatch;
-import org.apache.kylin.streaming.StreamMessage;
-import org.apache.kylin.streaming.StringStreamParser;
+import org.apache.kylin.streaming.*;
import org.apache.kylin.streaming.invertedindex.SliceBuilder;
import org.junit.After;
import org.junit.Assert;
@@ -76,16 +74,48 @@ public class IITest extends LocalFileMetadataTestCase {
this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
this.iiDesc = ii.getDescriptor();
- List<List<String>> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, List<String>>() {
+ List<StreamMessage> streamMessages = Lists.transform(Arrays.asList(inputData), new Function<String, StreamMessage>() {
@Nullable
@Override
- public List<String> apply(@Nullable String input) {
- return StringStreamParser.instance.parse(new StreamMessage(System.currentTimeMillis(), input.getBytes())).getStreamMessage();
+ public StreamMessage apply(String input) {
+ return new StreamMessage(System.currentTimeMillis(), input.getBytes());
}
});
+ List<List<String>> parsedStreamMessages = Lists.newArrayList();
+ StreamParser parser = StringStreamParser.instance;
+ StreamFilter filter = DefaultStreamFilter.instance;
+ long startOffset = Long.MAX_VALUE;
+ long endOffset = Long.MIN_VALUE;
+ long startTimestamp = Long.MAX_VALUE;
+ long endTimestamp = Long.MIN_VALUE;
+
+ for(StreamMessage message: streamMessages)
+ {
+ ParsedStreamMessage parsedStreamMessage = parser.parse(message);
+ if(filter.filter(parsedStreamMessage))
+ {
+ if (startOffset > parsedStreamMessage.getOffset()) {
+ startOffset = parsedStreamMessage.getOffset();
+ }
+ if (endOffset < parsedStreamMessage.getOffset()) {
+ endOffset = parsedStreamMessage.getOffset();
+ }
+ if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+ startTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+ endTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+ }
+ }
+
+ MicroStreamBatch batch = new MicroStreamBatch(parsedStreamMessages, org.apache.kylin.common.util.Pair.newPair(startTimestamp, endTimestamp), org.apache.kylin.common.util.Pair.newPair(startOffset, endOffset));
+
+
iiRows = Lists.newArrayList();
- final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice(new MicroStreamBatch(streamMessages, org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis()), org.apache.kylin.common.util.Pair.newPair(System.currentTimeMillis(), System.currentTimeMillis())));
+ final Slice slice = new SliceBuilder(iiDesc, (short) 0, true).buildSlice((batch));
IIKeyValueCodec codec = new IIKeyValueCodec(slice.getInfo());
for (IIRow iiRow : codec.encodeKeyValue(slice)) {
iiRows.add(iiRow);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/26e4ff7e/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index b6f5025..f0f3b6f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -81,6 +81,18 @@ public class KafkaConfig extends RootPersistentEntity {
@JsonProperty("parserName")
private String parserName;
+ @JsonProperty("filterName")
+ private String filterName;
+
+ public String getFilterName() {
+ return filterName;
+ }
+
+ public void setFilterName(String filterName) {
+ this.filterName = filterName;
+ }
+
+
public String getParserName() {
return parserName;
}