You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/06/02 04:36:13 UTC
incubator-kylin git commit: refactor StreamBuilder to support parse
and filter
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 d27319b58 -> ad27ae28f
refactor StreamBuilder to support parse and filter
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ad27ae28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ad27ae28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ad27ae28
Branch: refs/heads/0.8.0
Commit: ad27ae28fae82f22c3fa64aac4c0a971b0cc3a55
Parents: d27319b
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jun 2 00:55:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Jun 2 10:36:30 2015 +0800
----------------------------------------------------------------------
.../kylin/job/streaming/CubeStreamBuilder.java | 29 ++----
.../kylin/streaming/DefaultStreamFilter.java | 15 +++
.../kylin/streaming/JsonStreamParser.java | 4 +-
.../kylin/streaming/MicroStreamBatch.java | 38 +++++++
.../kylin/streaming/ParsedStreamMessage.java | 32 ++++++
.../kylin/streaming/SEOJsonStreamParser.java | 100 -------------------
.../apache/kylin/streaming/StreamBuilder.java | 86 +++++++++++-----
.../apache/kylin/streaming/StreamFilter.java | 8 ++
.../apache/kylin/streaming/StreamParser.java | 4 +-
.../kylin/streaming/StringStreamParser.java | 6 +-
.../invertedindex/IIStreamBuilder.java | 13 +--
.../streaming/invertedindex/SliceBuilder.java | 14 +--
12 files changed, 178 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 6678da9..79fd674 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -20,14 +20,9 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.persistence.HBaseConnection;
@@ -54,7 +49,6 @@ import org.apache.kylin.dict.lookup.TableSignature;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
import org.apache.kylin.job.inmemcubing.ICuboidWriter;
import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
@@ -62,7 +56,7 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.streaming.SEOJsonStreamParser;
+import org.apache.kylin.streaming.MicroStreamBatch;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamMessage;
import org.slf4j.Logger;
@@ -94,18 +88,17 @@ public class CubeStreamBuilder extends StreamBuilder {
this.kylinConfig = KylinConfig.getInstanceFromEnv();
this.cubeManager = CubeManager.getInstance(kylinConfig);
this.cubeName = cubeName;
- setStreamParser(new SEOJsonStreamParser(cubeManager.getCube(cubeName).getAllColumns()));
}
@Override
- protected void build(List<StreamMessage> streamMessages) throws Exception {
- if (CollectionUtils.isEmpty(streamMessages)) {
+ protected void build(MicroStreamBatch microStreamBatch) throws Exception {
+ if (microStreamBatch.size() == 0) {
logger.info("nothing to build, skip to next iteration");
return;
}
- final List<List<String>> parsedStreamMessages = parseStream(streamMessages);
- long startOffset = streamMessages.get(0).getOffset();
- long endOffset = streamMessages.get(streamMessages.size() - 1).getOffset();
+ final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
+ long startOffset = microStreamBatch.getOffset().getFirst();
+ long endOffset = microStreamBatch.getOffset().getSecond();
LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
blockingQueue.put(Collections.<String>emptyList());
@@ -374,16 +367,6 @@ public class CubeStreamBuilder extends StreamBuilder {
}
- private List<List<String>> parseStream(List<StreamMessage> streamMessages) {
- return Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(StreamMessage input) {
- return getStreamParser().parse(input);
- }
- });
- }
-
private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
final String hTableName = cubeSegment.getStorageLocationIdentifier();
CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java b/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
new file mode 100644
index 0000000..875ae4f
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/DefaultStreamFilter.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public final class DefaultStreamFilter implements StreamFilter {
+
+ public static final DefaultStreamFilter instance = new DefaultStreamFilter();
+
+ private DefaultStreamFilter() {}
+
+ @Override
+ public boolean filter(ParsedStreamMessage streamMessage) {
+ return streamMessage != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index 7d730b7..78de231 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -64,7 +64,7 @@ public final class JsonStreamParser implements StreamParser {
}
@Override
- public List<String> parse(StreamMessage streamMessage) {
+ public ParsedStreamMessage parse(StreamMessage streamMessage) {
try {
Map<String, String> json = objectMapper.readValue(streamMessage.getRawData(), javaType);
ArrayList<String> result = Lists.newArrayList();
@@ -75,7 +75,7 @@ public final class JsonStreamParser implements StreamParser {
}
}
}
- return result;
+ return new ParsedStreamMessage(result, streamMessage.getOffset(), streamMessage.getOffset());
} catch (IOException e) {
logger.error("error parsing stream", e);
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
new file mode 100644
index 0000000..0adcee2
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -0,0 +1,38 @@
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.common.util.Pair;
+
+import java.util.List;
+
+/**
+ */
+public final class MicroStreamBatch {
+
+ private final List<List<String>> streams;
+
+ private final Pair<Long, Long> timestamp;
+
+ private final Pair<Long, Long> offset;
+
+ public MicroStreamBatch(List<List<String>> streams, Pair<Long, Long> timestamp, Pair<Long, Long> offset) {
+ this.streams = streams;
+ this.timestamp = timestamp;
+ this.offset = offset;
+ }
+
+ public final List<List<String>> getStreams() {
+ return streams;
+ }
+
+ public final Pair<Long, Long> getTimestamp() {
+ return timestamp;
+ }
+
+ public final Pair<Long, Long> getOffset() {
+ return offset;
+ }
+
+ public final int size() {
+ return streams.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java b/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
new file mode 100644
index 0000000..1ebe506
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/ParsedStreamMessage.java
@@ -0,0 +1,32 @@
+package org.apache.kylin.streaming;
+
+import java.util.List;
+
+/**
+ */
+public class ParsedStreamMessage {
+
+ private final List<String> streamMessage;
+
+ private long offset;
+
+ private long timestamp;
+
+ public ParsedStreamMessage(List<String> streamMessage, long offset, long timestamp) {
+ this.streamMessage = streamMessage;
+ this.offset = offset;
+ this.timestamp = timestamp;
+ }
+
+ public final List<String> getStreamMessage() {
+ return streamMessage;
+ }
+
+ public final long getOffset() {
+ return offset;
+ }
+
+ public final long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
deleted file mode 100644
index 5dab7f9..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/SEOJsonStreamParser.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *
- * contributor license agreements. See the NOTICE file distributed with
- *
- * this work for additional information regarding copyright ownership.
- *
- * The ASF licenses this file to You under the Apache License, Version 2.0
- *
- * (the "License"); you may not use this file except in compliance with
- *
- * the License. You may obtain a copy of the License at
- *
- *
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- *
- * distributed under the License is distributed on an "AS IS" BASIS,
- *
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- * See the License for the specific language governing permissions and
- *
- * limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public final class SEOJsonStreamParser implements StreamParser {
-
- private static final Logger logger = LoggerFactory.getLogger(SEOJsonStreamParser.class);
-
- private final List<TblColRef> allColumns;
- private final ObjectMapper mapper = new ObjectMapper();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
- public SEOJsonStreamParser(List<TblColRef> allColumns) {
- this.allColumns = allColumns;
- }
-
- @Override
- public List<String> parse(StreamMessage stream) {
- try {
- Map<String, String> root = mapper.readValue(stream.getRawData(), mapType);
- String trafficSource = root.get("trafficsourceid");
- if ("20".equals(trafficSource) || "21".equals(trafficSource) || "22".equals(trafficSource) || "23".equals(trafficSource)) {
- ArrayList<String> result = Lists.newArrayList();
- for (TblColRef column : allColumns) {
- String columnName = column.getName();
- if (columnName.equalsIgnoreCase("minute_start")) {
- result.add(String.valueOf(TimeUtil.getMinuteStart(Long.valueOf(root.get("timestamp")))));
- } else if (columnName.equalsIgnoreCase("hour_start")) {
- result.add(String.valueOf(TimeUtil.getHourStart(Long.valueOf(root.get("timestamp")))));
- } else if (columnName.equalsIgnoreCase("day")) {
- //of day start we'll add yyyy-mm-dd
- long ts = TimeUtil.getDayStart(Long.valueOf(root.get("timestamp")));
- result.add(DateFormat.formatToDateStr(ts));
- } else {
- String x = root.get(columnName.toLowerCase());
- result.add(x);
- }
- }
-
- return result;
- } else {
- return null;
- }
- } catch (IOException e) {
- logger.error("error parsing:" + new String(stream.getRawData()), e);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index e9cb046..cb5dc1d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -35,6 +35,8 @@
package org.apache.kylin.streaming;
import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kylin.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,57 +53,87 @@ public abstract class StreamBuilder implements Runnable {
private StreamParser streamParser = StringStreamParser.instance;
+ private StreamFilter streamFilter = DefaultStreamFilter.instance;
+
private BlockingQueue<StreamMessage> streamMessageQueue;
private long lastBuildTime = System.currentTimeMillis();
+ private long startOffset;
+ private long endOffset;
+
+ private long startTimestamp;
+ private long endTimestamp;
+
public StreamBuilder(BlockingQueue<StreamMessage> streamMessageQueue) {
this.streamMessageQueue = streamMessageQueue;
}
- protected abstract void build(List<StreamMessage> streamsToBuild) throws Exception;
+ protected abstract void build(MicroStreamBatch microStreamBatch) throws Exception;
protected abstract void onStop();
private void clearCounter() {
lastBuildTime = System.currentTimeMillis();
+ startOffset = Long.MAX_VALUE;
+ endOffset = Long.MIN_VALUE;
+ startTimestamp = Long.MAX_VALUE;
+ endTimestamp = Long.MIN_VALUE;
}
@Override
public void run() {
try {
- List<StreamMessage> streamMessageToBuild = Lists.newArrayList();
- clearCounter();
+ List<List<String>> parsedStreamMessages = null;
while (true) {
+ if (parsedStreamMessages == null) {
+ parsedStreamMessages = Lists.newLinkedList();
+ clearCounter();
+ }
StreamMessage streamMessage;
try {
streamMessage = streamMessageQueue.poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.warn("stream queue interrupted", e);
+ logger.warn("stream queue should not be interrupted", e);
continue;
}
- if (streamMessage == null || getStreamParser().parse(streamMessage) == null) {
- if (streamMessage == null) {
- logger.info("The stream queue is drained, current available stream count: " + streamMessageToBuild.size());
- }
- if ((System.currentTimeMillis() - lastBuildTime) > batchInterval()) {
- build(streamMessageToBuild);
- clearCounter();
- streamMessageToBuild.clear();
+ if (streamMessage == null) {
+ logger.info("The stream queue is drained, current available stream count: " + parsedStreamMessages.size());
+ if ((System.currentTimeMillis() - lastBuildTime) > batchInterval() && !parsedStreamMessages.isEmpty()) {
+ build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
+ parsedStreamMessages = null;
}
continue;
- } else {
- if (streamMessage.getOffset() < 0) {
- onStop();
- logger.warn("streaming encountered EOF, stop building");
- break;
- }
}
- streamMessageToBuild.add(streamMessage);
- if (streamMessageToBuild.size() >= batchSize()) {
- build(streamMessageToBuild);
- clearCounter();
- streamMessageToBuild.clear();
+ if (streamMessage.getOffset() < 0) {
+ onStop();
+ logger.warn("streaming encountered EOF, stop building");
+ break;
}
+
+ final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
+
+ if (getStreamFilter().filter(parsedStreamMessage)) {
+ if (startOffset > parsedStreamMessage.getOffset()) {
+ startOffset = parsedStreamMessage.getOffset();
+ }
+ if (endOffset < parsedStreamMessage.getOffset()) {
+ endOffset = parsedStreamMessage.getOffset();
+ }
+ if (startTimestamp > parsedStreamMessage.getTimestamp()) {
+ startTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ if (endTimestamp < parsedStreamMessage.getTimestamp()) {
+ endTimestamp = parsedStreamMessage.getTimestamp();
+ }
+ parsedStreamMessages.add(parsedStreamMessage.getStreamMessage());
+ if (parsedStreamMessages.size() >= batchSize()) {
+ build(new MicroStreamBatch(parsedStreamMessages, Pair.newPair(startTimestamp, endTimestamp), Pair.newPair(startOffset, endOffset)));
+ parsedStreamMessages = null;
+ }
+ } else {
+ //ignore unfiltered stream message
+ }
+
}
} catch (Exception e) {
logger.error("build stream error, stop building", e);
@@ -117,6 +149,14 @@ public abstract class StreamBuilder implements Runnable {
this.streamParser = streamParser;
}
+ public final StreamFilter getStreamFilter() {
+ return streamFilter;
+ }
+
+ public final void setStreamFilter(StreamFilter streamFilter) {
+ this.streamFilter = streamFilter;
+ }
+
protected abstract int batchInterval();
protected abstract int batchSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
new file mode 100644
index 0000000..e590320
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamFilter.java
@@ -0,0 +1,8 @@
+package org.apache.kylin.streaming;
+
+/**
+ */
+public interface StreamFilter {
+
+ boolean filter(ParsedStreamMessage streamMessage);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
index 304e252..f541f36 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
@@ -34,11 +34,9 @@
package org.apache.kylin.streaming;
-import java.util.List;
-
/**
*/
public interface StreamParser {
- List<String> parse(StreamMessage streamMessage);
+ ParsedStreamMessage parse(StreamMessage streamMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
index 14e1b45..f0e335b 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
@@ -34,8 +34,6 @@
package org.apache.kylin.streaming;
-import java.util.List;
-
import com.google.common.collect.Lists;
/**
@@ -46,7 +44,7 @@ public final class StringStreamParser implements StreamParser {
private StringStreamParser(){}
@Override
- public List<String> parse(StreamMessage streamMessage) {
- return Lists.newArrayList(new String(streamMessage.getRawData()).split(","));
+ public ParsedStreamMessage parse(StreamMessage streamMessage) {
+ return new ParsedStreamMessage(Lists.newArrayList(new String(streamMessage.getRawData()).split(",")), streamMessage.getOffset(), streamMessage.getOffset());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 56435bd..cf86b44 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -45,6 +45,7 @@ import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.MicroStreamBatch;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.StreamBuilder;
import org.apache.kylin.streaming.StreamingManager;
@@ -89,23 +90,23 @@ public class IIStreamBuilder extends StreamBuilder {
}
@Override
- protected void build(List<StreamMessage> streamsToBuild) throws IOException {
- if (streamsToBuild.size() > 0) {
- long offset = streamsToBuild.get(0).getOffset();
+ protected void build(MicroStreamBatch microStreamBatch) throws IOException {
+ if (microStreamBatch.size() > 0) {
+ long offset = microStreamBatch.getOffset().getFirst();
if (offset < streamingManager.getOffset(streaming, shardId)) {
logger.info("this batch has already been built, skip building");
return;
}
- logger.info("stream build start, size:" + streamsToBuild.size());
+ logger.info("stream build start, size:" + microStreamBatch.size());
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
- final Slice slice = sliceBuilder.buildSlice(streamsToBuild, getStreamParser());
+ final Slice slice = sliceBuilder.buildSlice(microStreamBatch);
logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
loadToHBase(hTable, slice, new IIKeyValueCodec(slice.getInfo()));
submitOffset(offset);
stopwatch.stop();
- logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
+ logger.info("stream build finished, size:" + microStreamBatch.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + " " + TimeUnit.MILLISECONDS);
} else {
logger.info("nothing to build, skip building");
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ad27ae28/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
index b6769a9..604bf9c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -43,6 +43,7 @@ import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.apache.kylin.streaming.MicroStreamBatch;
import org.apache.kylin.streaming.StreamMessage;
import org.apache.kylin.streaming.StreamParser;
import org.slf4j.Logger;
@@ -68,17 +69,10 @@ public final class SliceBuilder {
}
- public Slice buildSlice(List<StreamMessage> streamMessages, final StreamParser streamParser) {
- List<List<String>> table = Lists.transform(streamMessages, new Function<StreamMessage, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(@Nullable StreamMessage input) {
- return streamParser.parse(input);
- }
- });
- final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(table, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ public Slice buildSlice(MicroStreamBatch microStreamBatch) {
+ final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(microStreamBatch.getStreams(), iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
- return build(table, tableRecordInfo, dictionaries);
+ return build(microStreamBatch.getStreams(), tableRecordInfo, dictionaries);
}
private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {