You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/02/24 17:52:18 UTC

[druid] branch master updated: Performance fixes in proto readers (#12267)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b86f2d4  Performance fixes in proto readers (#12267)
b86f2d4 is described below

commit b86f2d4c2e935346d600e51b22403150ebd1501d
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Thu Feb 24 23:21:48 2022 +0530

    Performance fixes in proto readers (#12267)
---
 .../druid/data/input/impl/JsonInputFormat.java     |  2 +-
 .../druid/data/input/impl/NestedInputFormat.java   |  2 +-
 .../java/util/common/parsers/ObjectFlatteners.java |  5 +++--
 .../druid/data/input/avro/AvroOCFReader.java       |  2 +-
 .../druid/data/input/avro/AvroStreamReader.java    |  3 ++-
 .../org/apache/druid/data/input/orc/OrcReader.java |  3 ++-
 .../druid/data/input/parquet/ParquetReader.java    |  3 ++-
 .../druid/data/input/protobuf/ProtobufReader.java  |  6 +++---
 .../data/input/protobuf/ProtobufReaderTest.java    | 24 ++++++++++++++++++++++
 9 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 9b00131..8a9a3fd 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -65,7 +65,7 @@ public class JsonInputFormat extends NestedInputFormat
   }
 
   public JsonInputFormat(
-      JSONPathSpec flattenSpec,
+      @Nullable JSONPathSpec flattenSpec,
       Map<String, Boolean> featureSpec,
       Boolean keepNullColumns,
       boolean lineSplittable
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
index 90faea6..02f1f4b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
@@ -37,7 +37,7 @@ public abstract class NestedInputFormat implements InputFormat
 
   protected NestedInputFormat(@Nullable JSONPathSpec flattenSpec)
   {
-    this.flattenSpec = flattenSpec == null ? JSONPathSpec.DEFAULT : flattenSpec;
+    this.flattenSpec = flattenSpec;
   }
 
   @Nullable
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
index 2130c01..78fd114 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
@@ -24,6 +24,7 @@ import com.jayway.jsonpath.spi.json.JsonProvider;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.UOE;
 
+import javax.annotation.Nullable;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,12 +45,12 @@ public class ObjectFlatteners
   }
 
   public static <T> ObjectFlattener<T> create(
-      final JSONPathSpec flattenSpec,
+      @Nullable final JSONPathSpec flattenSpecInput,
       final FlattenerMaker<T> flattenerMaker
   )
   {
     final Map<String, Function<T, Object>> extractors = new LinkedHashMap<>();
-
+    final JSONPathSpec flattenSpec = flattenSpecInput == null ? JSONPathSpec.DEFAULT : flattenSpecInput;
     for (final JSONPathFieldSpec fieldSpec : flattenSpec.getFields()) {
       final Function<T, Object> extractor;
 
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index dc2a6d7..09888f7 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -55,7 +55,7 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
       InputEntity source,
       File temporaryDirectory,
       @Nullable Schema readerSchema,
-      JSONPathSpec flattenSpec,
+      @Nullable JSONPathSpec flattenSpec,
       boolean binaryAsString,
       boolean extractUnionsByType
   )
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index c04ec14..50da0e5 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -51,7 +52,7 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
       InputRowSchema inputRowSchema,
       InputEntity source,
       AvroBytesDecoder avroBytesDecoder,
-      JSONPathSpec flattenSpec,
+      @Nullable JSONPathSpec flattenSpec,
       boolean binaryAsString,
       boolean extractUnionsByType
   )
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index b5ba677..ca10988 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -41,6 +41,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.mapred.OrcMapredRecordReader;
 import org.apache.orc.mapred.OrcStruct;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -61,7 +62,7 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
       InputRowSchema inputRowSchema,
       InputEntity source,
       File temporaryDirectory,
-      JSONPathSpec flattenSpec,
+      @Nullable JSONPathSpec flattenSpec,
       boolean binaryAsString
   )
   {
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 87c6cf6..bbd45f6 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -57,7 +58,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
       InputRowSchema inputRowSchema,
       InputEntity source,
       File temporaryDirectory,
-      JSONPathSpec flattenSpec,
+      @Nullable JSONPathSpec flattenSpec,
       boolean binaryAsString
   )
   {
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index 1715753..e24579f 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -49,6 +49,7 @@ import java.util.Map;
 
 public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
 {
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private final InputRowSchema inputRowSchema;
   private final InputEntity source;
   private final JSONPathSpec flattenSpec;
@@ -88,7 +89,7 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
   {
     Map<String, Object> record;
 
-    if (flattenSpec == null) {
+    if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) {
       try {
         record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName());
       }
@@ -98,8 +99,7 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
     } else {
       try {
         String json = JsonFormat.printer().print(intermediateRow);
-        JsonNode document = new ObjectMapper().readValue(json, JsonNode.class);
-        record = recordFlattener.flatten(document);
+        record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json, JsonNode.class));
       }
       catch (InvalidProtocolBufferException e) {
         throw new ParseException(null, e, "Protobuf message could not be parsed");
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
index 2df960d..bbce375 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
 import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 import org.junit.Before;
@@ -122,4 +123,27 @@ public class ProtobufReaderTest
 
     ProtobufInputRowParserTest.verifyFlatDataWithComplexTimestamp(row, dateTime);
   }
+
+  @Test
+  public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec() throws Exception
+  {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("is unparseable!");
+    ProtobufReader reader = new ProtobufReader(
+        inputRowSchemaWithComplexTimestamp,
+        null,
+        decoder,
+        JSONPathSpec.DEFAULT
+    );
+
+    //create binary of proto test event
+    DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
+    ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatDataWithComplexTimestamp(dateTime);
+
+    ByteBuffer buffer = ProtobufInputRowParserTest.toByteBuffer(event);
+
+    InputRow row = reader.parseInputRows(decoder.parse(buffer)).get(0);
+
+    ProtobufInputRowParserTest.verifyFlatDataWithComplexTimestamp(row, dateTime);
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org