You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2018/11/16 03:48:37 UTC

[incubator-druid] branch master updated: optimize input row parsers (#6590)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b0d58  optimize input row parsers (#6590)
93b0d58 is described below

commit 93b0d585710668b4e2920db9e6f40ba92b6c50b1
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Fri Nov 16 11:48:32 2018 +0800

    optimize input row parsers (#6590)
    
    * optimize input row parsers
    
    * address comments
---
 .../druid/data/input/impl/MapInputRowParser.java   | 21 +++++++++++----------
 .../data/input/orc/OrcHadoopInputRowParser.java    |  9 +++++++++
 .../data/input/thrift/ThriftInputRowParser.java    | 15 +++++++++++++--
 .../druid/data/input/AvroHadoopInputRowParser.java |  5 ++++-
 .../druid/data/input/AvroStreamInputRowParser.java |  5 ++++-
 .../apache/druid/data/input/avro/AvroParsers.java  |  4 ++--
 .../avro/ParquetAvroHadoopInputRowParser.java      | 22 +++++++++++-----------
 .../input/protobuf/ProtobufInputRowParser.java     | 15 +++++++++++++--
 8 files changed, 67 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
index 133e574..80db2c4 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java
@@ -36,6 +36,7 @@ import java.util.Map;
 public class MapInputRowParser implements InputRowParser<Map<String, Object>>
 {
   private final ParseSpec parseSpec;
+  private final List<String> dimensions;
 
   @JsonCreator
   public MapInputRowParser(
@@ -43,20 +44,20 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
   )
   {
     this.parseSpec = parseSpec;
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   @Override
   public List<InputRow> parseBatch(Map<String, Object> theMap)
   {
-    final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions()
-                                    ? parseSpec.getDimensionsSpec().getDimensionNames()
-                                    : Lists.newArrayList(
-                                        Sets.difference(
-                                            theMap.keySet(),
-                                            parseSpec.getDimensionsSpec()
-                                                     .getDimensionExclusions()
-                                        )
-                                    );
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(theMap.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
 
     final DateTime timestamp;
     try {
@@ -75,7 +76,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
       throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
     }
 
-    return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap));
+    return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, theMap));
   }
 
   @JsonProperty
diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
index 21b249c..e31bcd8 100644
--- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
+++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
@@ -25,6 +25,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.InputRowParser;
@@ -129,6 +130,14 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
     TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
     DateTime dateTime = timestampSpec.extractTimestamp(map);
 
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
   }
 
diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
index 740b888..8e148d2 100644
--- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
+++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
@@ -55,6 +57,7 @@ public class ThriftInputRowParser implements InputRowParser<Object>
 
   private Parser<String, Object> parser;
   private volatile Class<TBase> thriftClass = null;
+  private final List<String> dimensions;
 
   @JsonCreator
   public ThriftInputRowParser(
@@ -68,6 +71,7 @@ public class ThriftInputRowParser implements InputRowParser<Object>
     Preconditions.checkNotNull(thriftClassName, "thrift class name");
 
     this.parseSpec = parseSpec;
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   public Class<TBase> getThriftClass()
@@ -139,10 +143,17 @@ public class ThriftInputRowParser implements InputRowParser<Object>
     }
 
     Map<String, Object> record = parser.parseToMap(json);
-
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(
         parseSpec.getTimestampSpec().extractTimestamp(record),
-        parseSpec.getDimensionsSpec().getDimensionNames(),
+        dimensions,
         record
     ));
   }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 19de982..83b2734 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.avro.AvroParsers;
 import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 
@@ -33,6 +34,7 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
   private final ParseSpec parseSpec;
   private final boolean fromPigAvroStorage;
   private final ObjectFlattener<GenericRecord> avroFlattener;
+  private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroHadoopInputRowParser(
@@ -43,12 +45,13 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
     this.parseSpec = parseSpec;
     this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
     this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
+    this.mapParser = new MapInputRowParser(parseSpec);
   }
 
   @Override
   public List<InputRow> parseBatch(GenericRecord record)
   {
-    return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener);
+    return AvroParsers.parseGenericRecord(record, mapParser, avroFlattener);
   }
 
   @JsonProperty
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index ab5c2d3..749970f 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.druid.data.input.avro.AvroBytesDecoder;
 import org.apache.druid.data.input.avro.AvroParsers;
+import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 
@@ -36,6 +37,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
   private final ParseSpec parseSpec;
   private final AvroBytesDecoder avroBytesDecoder;
   private final ObjectFlattener<GenericRecord> avroFlattener;
+  private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroStreamInputRowParser(
@@ -46,12 +48,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
     this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
     this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
     this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+    this.mapParser = new MapInputRowParser(parseSpec);
   }
 
   @Override
   public List<InputRow> parseBatch(ByteBuffer input)
   {
-    return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, avroFlattener);
+    return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener);
   }
 
   @JsonProperty
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 1adb4bf..92ea3ae 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -54,10 +54,10 @@ public class AvroParsers
 
   public static List<InputRow> parseGenericRecord(
       GenericRecord record,
-      ParseSpec parseSpec,
+      MapInputRowParser mapParser,
       ObjectFlattener<GenericRecord> avroFlattener
   )
   {
-    return new MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record));
+    return mapParser.parseBatch(avroFlattener.flatten(record));
   }
 }
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index cdf1f85..330d9e1 100755
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.parquet.avro;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -40,7 +41,6 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -51,7 +51,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
   private final boolean binaryAsString;
   private final TimestampSpec timestampSpec;
   private final ObjectFlattener<GenericRecord> recordFlattener;
-
+  private final List<String> dimensions;
 
   @JsonCreator
   public ParquetAvroHadoopInputRowParser(
@@ -61,6 +61,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
   {
     this.parseSpec = parseSpec;
     this.timestampSpec = parseSpec.getTimestampSpec();
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
     this.binaryAsString = binaryAsString == null ? false : binaryAsString;
 
     final JSONPathSpec flattenSpec;
@@ -95,15 +96,14 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
   {
     Map<String, Object> row = recordFlattener.flatten(record);
 
-    final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions()
-                                    ? parseSpec.getDimensionsSpec().getDimensionNames()
-                                    : new ArrayList(
-                                        Sets.difference(
-                                            row.keySet(),
-                                            parseSpec.getDimensionsSpec()
-                                                     .getDimensionExclusions()
-                                        )
-                                    );
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(row.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     // check for parquet Date
     // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
     LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn());
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
index b9075a1..b75e400 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.os72.protobuf.dynamic.DynamicSchema;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
@@ -53,7 +55,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
   private final String protoMessageType;
   private final Descriptor descriptor;
   private Parser<String, Object> parser;
-
+  private final List<String> dimensions;
 
   @JsonCreator
   public ProtobufInputRowParser(
@@ -66,6 +68,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
     this.descriptorFilePath = descriptorFilePath;
     this.protoMessageType = protoMessageType;
     this.descriptor = getDescriptor(descriptorFilePath);
+    this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
   }
 
   @Override
@@ -98,9 +101,17 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
     }
 
     Map<String, Object> record = parser.parseToMap(json);
+    final List<String> dimensions;
+    if (!this.dimensions.isEmpty()) {
+      dimensions = this.dimensions;
+    } else {
+      dimensions = Lists.newArrayList(
+          Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
     return ImmutableList.of(new MapBasedInputRow(
         parseSpec.getTimestampSpec().extractTimestamp(record),
-        parseSpec.getDimensionsSpec().getDimensionNames(),
+        dimensions,
         record
     ));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org