You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/15 01:53:36 UTC

kylin git commit: small refactor in StreamingParser

Repository: kylin
Updated Branches:
  refs/heads/master 0a3541254 -> 5b4cbc4b6


small refactor in StreamingParser

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5b4cbc4b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5b4cbc4b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5b4cbc4b

Branch: refs/heads/master
Commit: 5b4cbc4b61e444093b88fdb4ba3d3afeba7468d3
Parents: 0a35412
Author: shaofengshi <sh...@apache.org>
Authored: Wed Jun 15 09:52:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Jun 15 09:52:47 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeSegment.java | 18 ++--
 .../kylin/source/kafka/KafkaStreamingInput.java | 18 +++-
 .../kylin/source/kafka/StreamingParser.java     | 93 +++++++++++++-------
 .../source/kafka/StringStreamingParser.java     |  3 +-
 .../source/kafka/TimedJsonStreamParser.java     | 57 ++++--------
 5 files changed, 112 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index cbcee14..f79e06d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -19,13 +19,10 @@
 package org.apache.kylin.cube;
 
 import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Dictionary;
@@ -101,6 +98,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment
     @JsonProperty("rowkey_stats")
     private List<Object[]> rowkeyStats = Lists.newArrayList();
 
+    @JsonProperty("additionalInfo")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private HashMap<String, String> additionalInfo = new LinkedHashMap<String, String>();
+
     private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid
 
     public CubeDesc getCubeDesc() {
@@ -523,4 +524,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment
         this.indexPath = indexPath;
     }
 
+    public HashMap<String, String> getAdditionalInfo() {
+        return additionalInfo;
+    }
+
+    public void setAdditionalInfo(HashMap<String, String> additionalInfo) {
+        this.additionalInfo = additionalInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index c05119f..564c221 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Function;
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
@@ -49,11 +50,14 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.streaming.IStreamingInput;
 import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -64,6 +68,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import javax.annotation.Nullable;
+
 @SuppressWarnings("unused")
 public class KafkaStreamingInput implements IStreamingInput {
 
@@ -88,10 +94,20 @@ public class KafkaStreamingInput implements IStreamingInput {
             try {
                 final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
                 final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-                final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName);
+                List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cube.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+                    @Nullable
+                    @Override
+                    public TblColRef apply(IntermediateColumnDesc input) {
+                        return input.getColRef();
+                    }
+                });
+
+                final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns);
                 final ExecutorService executorService = Executors.newCachedThreadPool();
                 final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
                 for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+
+
                     final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
                     for (int i = 0; i < partitionCount; ++i) {
                         final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 7b326e2..9075c77 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -36,54 +36,87 @@ package org.apache.kylin.source.kafka;
 
 import java.lang.reflect.Constructor;
 import java.util.List;
+import java.util.Set;
 
-import javax.annotation.Nullable;
-
-import kafka.message.MessageAndOffset;
-
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 
 /**
  * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
  */
 public abstract class StreamingParser {
 
+    public static final Set derivedTimeColumns = Sets.newHashSet();
+    static {
+        derivedTimeColumns.add("minute_start");
+        derivedTimeColumns.add("hour_start");
+        derivedTimeColumns.add("day_start");
+        derivedTimeColumns.add("week_start");
+        derivedTimeColumns.add("month_start");
+        derivedTimeColumns.add("quarter_start");
+        derivedTimeColumns.add("year_start");
+    }
+
+
     /**
-     * @param kafkaMessage
+     * @param message
      * @return StreamingMessage must not be NULL
      */
-    abstract public StreamingMessage parse(MessageAndOffset kafkaMessage);
+    abstract public StreamingMessage parse(Object message);
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 
-    public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException {
-        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName);
-        List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
-            @Nullable
-            @Override
-            public TblColRef apply(IntermediateColumnDesc input) {
-                return input.getColRef();
-            }
-        });
-        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
-            Class clazz = Class.forName(kafkaConfig.getParserName());
+    public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
+        if (!StringUtils.isEmpty(parserName)) {
+            Class clazz = Class.forName(parserName);
             Constructor constructor = clazz.getConstructor(List.class, String.class);
-            return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
+            return (StreamingParser) constructor.newInstance(columns, parserProperties);
         } else {
-            throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
+            throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + ".");
+        }
+    }
+
+    /**
+     * Calculate the derived time column value and put to the result list.
+     * @param columnName the column name, should be in lower case
+     * @param result the string list which representing a row
+     * @param t the timestamp that to calculate the derived time
+     * @return true if the columnName is a derived time column; otherwise false;
+     */
+    public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) {
+        if (derivedTimeColumns.contains(columnName) == false)
+            return false;
+
+        long normalized = 0;
+        if (columnName.equals("minute_start")) {
+            normalized = TimeUtil.getMinuteStart(t);
+            result.add(DateFormat.formatToTimeStr(normalized));
+        } else if (columnName.equals("hour_start")) {
+            normalized = TimeUtil.getHourStart(t);
+            result.add(DateFormat.formatToTimeStr(normalized));
+        } else if (columnName.equals("day_start")) {
+            //from day_start on, formatTs will output date format
+            normalized = TimeUtil.getDayStart(t);
+            result.add(DateFormat.formatToDateStr(normalized));
+        } else if (columnName.equals("week_start")) {
+            normalized = TimeUtil.getWeekStart(t);
+            result.add(DateFormat.formatToDateStr(normalized));
+        } else if (columnName.equals("month_start")) {
+            normalized = TimeUtil.getMonthStart(t);
+            result.add(DateFormat.formatToDateStr(normalized));
+        } else if (columnName.equals("quarter_start")) {
+            normalized = TimeUtil.getQuarterStart(t);
+            result.add(DateFormat.formatToDateStr(normalized));
+        } else if (columnName.equals("year_start")) {
+            normalized = TimeUtil.getYearStart(t);
+            result.add(DateFormat.formatToDateStr(normalized));
         }
+
+        return true;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 9691ea7..5226899 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -55,7 +55,8 @@ public final class StringStreamingParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(MessageAndOffset kafkaMessage) {
+    public StreamingMessage parse(Object message) {
+        MessageAndOffset kafkaMessage = (MessageAndOffset) message;
         final ByteBuffer payload = kafkaMessage.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index e3075d5..63f5637 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -40,9 +40,7 @@ import java.util.*;
 import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +58,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
 
     private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
 
+
     private List<TblColRef> allColumns;
     private boolean formatTs = false;//not used
     private final ObjectMapper mapper = new ObjectMapper();
@@ -75,14 +74,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
                     String[] parts = prop.split("=");
                     if (parts.length == 2) {
                         switch (parts[0]) {
-                        case "formatTs":
-                            this.formatTs = Boolean.valueOf(parts[1]);
-                            break;
-                        case "tsColName":
-                            this.tsColName = parts[1];
-                            break;
-                        default:
-                            break;
+                            case "formatTs":
+                                this.formatTs = Boolean.valueOf(parts[1]);
+                                break;
+                            case "tsColName":
+                                this.tsColName = parts[1];
+                                break;
+                            default:
+                                break;
                         }
                     }
                 } catch (Exception e) {
@@ -96,14 +95,13 @@ public final class TimedJsonStreamParser extends StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+    public StreamingMessage parse(Object msg) {
+        MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
         try {
             Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
             Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
-            //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + //
-            //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
             long t;
             if (StringUtils.isEmpty(tsStr)) {
                 t = 0;
@@ -112,38 +110,16 @@ public final class TimedJsonStreamParser extends StreamingParser {
             }
             ArrayList<String> result = Lists.newArrayList();
 
-            long normalized = 0;
             for (TblColRef column : allColumns) {
-                String columnName = column.getName();
-                if (columnName.equalsIgnoreCase("minute_start")) {
-                    normalized = TimeUtil.getMinuteStart(t);
-                    result.add(DateFormat.formatToTimeStr(normalized));
-                } else if (columnName.equalsIgnoreCase("hour_start")) {
-                    normalized = TimeUtil.getHourStart(t);
-                    result.add(DateFormat.formatToTimeStr(normalized));
-                } else if (columnName.equalsIgnoreCase("day_start")) {
-                    //from day_start on, formatTs will output date format
-                    normalized = TimeUtil.getDayStart(t);
-                    result.add(DateFormat.formatToDateStr(normalized));
-                } else if (columnName.equalsIgnoreCase("week_start")) {
-                    normalized = TimeUtil.getWeekStart(t);
-                    result.add(DateFormat.formatToDateStr(normalized));
-                } else if (columnName.equalsIgnoreCase("month_start")) {
-                    normalized = TimeUtil.getMonthStart(t);
-                    result.add(DateFormat.formatToDateStr(normalized));
-                } else if (columnName.equalsIgnoreCase("quarter_start")) {
-                    normalized = TimeUtil.getQuarterStart(t);
-                    result.add(DateFormat.formatToDateStr(normalized));
-                } else if (columnName.equalsIgnoreCase("year_start")) {
-                    normalized = TimeUtil.getYearStart(t);
-                    result.add(DateFormat.formatToDateStr(normalized));
-                } else {
-                    String x = root.get(columnName.toLowerCase());
+                String columnName = column.getName().toLowerCase();
+
+                if (populateDerivedTimeColumns(columnName, result, t) == false) {
+                    String x = root.get(columnName);
                     result.add(x);
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap());
+            return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap());
 
         } catch (IOException e) {
             logger.error("error", e);
@@ -151,6 +127,7 @@ public final class TimedJsonStreamParser extends StreamingParser {
         }
     }
 
+
     @Override
     public boolean filter(StreamingMessage streamingMessage) {
         return true;