You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/15 01:53:36 UTC
kylin git commit: small refactor in StreamingParser
Repository: kylin
Updated Branches:
refs/heads/master 0a3541254 -> 5b4cbc4b6
small refactor in StreamingParser
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5b4cbc4b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5b4cbc4b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5b4cbc4b
Branch: refs/heads/master
Commit: 5b4cbc4b61e444093b88fdb4ba3d3afeba7468d3
Parents: 0a35412
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jun 15 09:52:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Jun 15 09:52:47 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeSegment.java | 18 ++--
.../kylin/source/kafka/KafkaStreamingInput.java | 18 +++-
.../kylin/source/kafka/StreamingParser.java | 93 +++++++++++++-------
.../source/kafka/StringStreamingParser.java | 3 +-
.../source/kafka/TimedJsonStreamParser.java | 57 ++++--------
5 files changed, 112 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index cbcee14..f79e06d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -19,13 +19,10 @@
package org.apache.kylin.cube;
import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.Dictionary;
@@ -101,6 +98,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment
@JsonProperty("rowkey_stats")
private List<Object[]> rowkeyStats = Lists.newArrayList();
+ @JsonProperty("additionalInfo")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ private HashMap<String, String> additionalInfo = new LinkedHashMap<String, String>();
+
private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
public CubeDesc getCubeDesc() {
@@ -523,4 +524,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment
this.indexPath = indexPath;
}
+ public HashMap<String, String> getAdditionalInfo() {
+ return additionalInfo;
+ }
+
+ public void setAdditionalInfo(HashMap<String, String> additionalInfo) {
+ this.additionalInfo = additionalInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index c05119f..564c221 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Function;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
@@ -49,11 +50,14 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.streaming.IStreamingInput;
import org.apache.kylin.common.util.StreamingBatch;
import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,6 +68,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+
@SuppressWarnings("unused")
public class KafkaStreamingInput implements IStreamingInput {
@@ -88,10 +94,20 @@ public class KafkaStreamingInput implements IStreamingInput {
try {
final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
- final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName);
+ List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cube.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+ @Nullable
+ @Override
+ public TblColRef apply(IntermediateColumnDesc input) {
+ return input.getColRef();
+ }
+ });
+
+ final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
final ExecutorService executorService = Executors.newCachedThreadPool();
final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+
+
final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
for (int i = 0; i < partitionCount; ++i) {
final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 7b326e2..9075c77 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -36,54 +36,87 @@ package org.apache.kylin.source.kafka;
import java.lang.reflect.Constructor;
import java.util.List;
+import java.util.Set;
-import javax.annotation.Nullable;
-
-import kafka.message.MessageAndOffset;
-
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
/**
* By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
*/
public abstract class StreamingParser {
+ public static final Set derivedTimeColumns = Sets.newHashSet();
+ static {
+ derivedTimeColumns.add("minute_start");
+ derivedTimeColumns.add("hour_start");
+ derivedTimeColumns.add("day_start");
+ derivedTimeColumns.add("week_start");
+ derivedTimeColumns.add("month_start");
+ derivedTimeColumns.add("quarter_start");
+ derivedTimeColumns.add("year_start");
+ }
+
+
/**
- * @param kafkaMessage
+ * @param message
* @return StreamingMessage must not be NULL
*/
- abstract public StreamingMessage parse(MessageAndOffset kafkaMessage);
+ abstract public StreamingMessage parse(Object message);
abstract public boolean filter(StreamingMessage streamingMessage);
- public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException {
- final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName);
- List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
- @Nullable
- @Override
- public TblColRef apply(IntermediateColumnDesc input) {
- return input.getColRef();
- }
- });
- if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
- Class clazz = Class.forName(kafkaConfig.getParserName());
+ public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
+ if (!StringUtils.isEmpty(parserName)) {
+ Class clazz = Class.forName(parserName);
Constructor constructor = clazz.getConstructor(List.class, String.class);
- return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
+ return (StreamingParser) constructor.newInstance(columns, parserProperties);
} else {
- throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
+ throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
+ }
+ }
+
+ /**
+ * Calculate the derived time column value and put to the result list.
+ * @param columnName the column name, should be in lower case
+ * @param result the string list which representing a row
+ * @param t the timestamp that to calculate the derived time
+ * @return true if the columnName is a derived time column; otherwise false;
+ */
+ public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) {
+ if (derivedTimeColumns.contains(columnName) == false)
+ return false;
+
+ long normalized = 0;
+ if (columnName.equals("minute_start")) {
+ normalized = TimeUtil.getMinuteStart(t);
+ result.add(DateFormat.formatToTimeStr(normalized));
+ } else if (columnName.equals("hour_start")) {
+ normalized = TimeUtil.getHourStart(t);
+ result.add(DateFormat.formatToTimeStr(normalized));
+ } else if (columnName.equals("day_start")) {
+ //from day_start on, formatTs will output date format
+ normalized = TimeUtil.getDayStart(t);
+ result.add(DateFormat.formatToDateStr(normalized));
+ } else if (columnName.equals("week_start")) {
+ normalized = TimeUtil.getWeekStart(t);
+ result.add(DateFormat.formatToDateStr(normalized));
+ } else if (columnName.equals("month_start")) {
+ normalized = TimeUtil.getMonthStart(t);
+ result.add(DateFormat.formatToDateStr(normalized));
+ } else if (columnName.equals("quarter_start")) {
+ normalized = TimeUtil.getQuarterStart(t);
+ result.add(DateFormat.formatToDateStr(normalized));
+ } else if (columnName.equals("year_start")) {
+ normalized = TimeUtil.getYearStart(t);
+ result.add(DateFormat.formatToDateStr(normalized));
}
+
+ return true;
}
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 9691ea7..5226899 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -55,7 +55,8 @@ public final class StringStreamingParser extends StreamingParser {
}
@Override
- public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+ public StreamingMessage parse(Object message) {
+ MessageAndOffset kafkaMessage = (MessageAndOffset) message;
final ByteBuffer payload = kafkaMessage.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e3075d5..63f5637 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,9 +40,7 @@ import java.util.*;
import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +58,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
private List<TblColRef> allColumns;
private boolean formatTs = false;//not used
private final ObjectMapper mapper = new ObjectMapper();
@@ -75,14 +74,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
String[] parts = prop.split("=");
if (parts.length == 2) {
switch (parts[0]) {
- case "formatTs":
- this.formatTs = Boolean.valueOf(parts[1]);
- break;
- case "tsColName":
- this.tsColName = parts[1];
- break;
- default:
- break;
+ case "formatTs":
+ this.formatTs = Boolean.valueOf(parts[1]);
+ break;
+ case "tsColName":
+ this.tsColName = parts[1];
+ break;
+ default:
+ break;
}
}
} catch (Exception e) {
@@ -96,14 +95,13 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
@Override
- public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+ public StreamingMessage parse(Object msg) {
+ MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
try {
Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
root.putAll(message);
String tsStr = root.get(tsColName);
- //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
- //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
long t;
if (StringUtils.isEmpty(tsStr)) {
t = 0;
@@ -112,38 +110,16 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
ArrayList<String> result = Lists.newArrayList();
- long normalized = 0;
for (TblColRef column : allColumns) {
- String columnName = column.getName();
- if (columnName.equalsIgnoreCase("minute_start")) {
- normalized = TimeUtil.getMinuteStart(t);
- result.add(DateFormat.formatToTimeStr(normalized));
- } else if (columnName.equalsIgnoreCase("hour_start")) {
- normalized = TimeUtil.getHourStart(t);
- result.add(DateFormat.formatToTimeStr(normalized));
- } else if (columnName.equalsIgnoreCase("day_start")) {
- //from day_start on, formatTs will output date format
- normalized = TimeUtil.getDayStart(t);
- result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equalsIgnoreCase("week_start")) {
- normalized = TimeUtil.getWeekStart(t);
- result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equalsIgnoreCase("month_start")) {
- normalized = TimeUtil.getMonthStart(t);
- result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equalsIgnoreCase("quarter_start")) {
- normalized = TimeUtil.getQuarterStart(t);
- result.add(DateFormat.formatToDateStr(normalized));
- } else if (columnName.equalsIgnoreCase("year_start")) {
- normalized = TimeUtil.getYearStart(t);
- result.add(DateFormat.formatToDateStr(normalized));
- } else {
- String x = root.get(columnName.toLowerCase());
+ String columnName = column.getName().toLowerCase();
+
+ if (populateDerivedTimeColumns(columnName, result, t) == false) {
+ String x = root.get(columnName);
result.add(x);
}
}
- return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
+ return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
} catch (IOException e) {
logger.error("error", e);
@@ -151,6 +127,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
}
}
+
@Override
public boolean filter(StreamingMessage streamingMessage) {
return true;