You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/02/24 17:52:18 UTC
[druid] branch master updated: Performance fixes in proto readers (#12267)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b86f2d4 Performance fixes in proto readers (#12267)
b86f2d4 is described below
commit b86f2d4c2e935346d600e51b22403150ebd1501d
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Thu Feb 24 23:21:48 2022 +0530
Performance fixes in proto readers (#12267)
---
.../druid/data/input/impl/JsonInputFormat.java | 2 +-
.../druid/data/input/impl/NestedInputFormat.java | 2 +-
.../java/util/common/parsers/ObjectFlatteners.java | 5 +++--
.../druid/data/input/avro/AvroOCFReader.java | 2 +-
.../druid/data/input/avro/AvroStreamReader.java | 3 ++-
.../org/apache/druid/data/input/orc/OrcReader.java | 3 ++-
.../druid/data/input/parquet/ParquetReader.java | 3 ++-
.../druid/data/input/protobuf/ProtobufReader.java | 6 +++---
.../data/input/protobuf/ProtobufReaderTest.java | 24 ++++++++++++++++++++++
9 files changed, 39 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
index 9b00131..8a9a3fd 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
@@ -65,7 +65,7 @@ public class JsonInputFormat extends NestedInputFormat
}
public JsonInputFormat(
- JSONPathSpec flattenSpec,
+ @Nullable JSONPathSpec flattenSpec,
Map<String, Boolean> featureSpec,
Boolean keepNullColumns,
boolean lineSplittable
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
index 90faea6..02f1f4b 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/NestedInputFormat.java
@@ -37,7 +37,7 @@ public abstract class NestedInputFormat implements InputFormat
protected NestedInputFormat(@Nullable JSONPathSpec flattenSpec)
{
- this.flattenSpec = flattenSpec == null ? JSONPathSpec.DEFAULT : flattenSpec;
+ this.flattenSpec = flattenSpec;
}
@Nullable
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
index 2130c01..78fd114 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
@@ -24,6 +24,7 @@ import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
+import javax.annotation.Nullable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,12 +45,12 @@ public class ObjectFlatteners
}
public static <T> ObjectFlattener<T> create(
- final JSONPathSpec flattenSpec,
+ @Nullable final JSONPathSpec flattenSpecInput,
final FlattenerMaker<T> flattenerMaker
)
{
final Map<String, Function<T, Object>> extractors = new LinkedHashMap<>();
-
+ final JSONPathSpec flattenSpec = flattenSpecInput == null ? JSONPathSpec.DEFAULT : flattenSpecInput;
for (final JSONPathFieldSpec fieldSpec : flattenSpec.getFields()) {
final Function<T, Object> extractor;
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index dc2a6d7..09888f7 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -55,7 +55,7 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
InputEntity source,
File temporaryDirectory,
@Nullable Schema readerSchema,
- JSONPathSpec flattenSpec,
+ @Nullable JSONPathSpec flattenSpec,
boolean binaryAsString,
boolean extractUnionsByType
)
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index c04ec14..50da0e5 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -51,7 +52,7 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
InputRowSchema inputRowSchema,
InputEntity source,
AvroBytesDecoder avroBytesDecoder,
- JSONPathSpec flattenSpec,
+ @Nullable JSONPathSpec flattenSpec,
boolean binaryAsString,
boolean extractUnionsByType
)
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index b5ba677..ca10988 100644
--- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -41,6 +41,7 @@ import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcMapredRecordReader;
import org.apache.orc.mapred.OrcStruct;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@@ -61,7 +62,7 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
- JSONPathSpec flattenSpec,
+ @Nullable JSONPathSpec flattenSpec,
boolean binaryAsString
)
{
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 87c6cf6..bbd45f6 100644
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@@ -57,7 +58,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
- JSONPathSpec flattenSpec,
+ @Nullable JSONPathSpec flattenSpec,
boolean binaryAsString
)
{
diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index 1715753..e24579f 100644
--- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -49,6 +49,7 @@ import java.util.Map;
public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
{
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final InputRowSchema inputRowSchema;
private final InputEntity source;
private final JSONPathSpec flattenSpec;
@@ -88,7 +89,7 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
{
Map<String, Object> record;
- if (flattenSpec == null) {
+ if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) {
try {
record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName());
}
@@ -98,8 +99,7 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
} else {
try {
String json = JsonFormat.printer().print(intermediateRow);
- JsonNode document = new ObjectMapper().readValue(json, JsonNode.class);
- record = recordFlattener.flatten(document);
+ record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json, JsonNode.class));
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
index 2df960d..bbce375 100644
--- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
+++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
@@ -122,4 +123,27 @@ public class ProtobufReaderTest
ProtobufInputRowParserTest.verifyFlatDataWithComplexTimestamp(row, dateTime);
}
+
+ @Test
+ public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec() throws Exception
+ {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("is unparseable!");
+ ProtobufReader reader = new ProtobufReader(
+ inputRowSchemaWithComplexTimestamp,
+ null,
+ decoder,
+ JSONPathSpec.DEFAULT
+ );
+
+ //create binary of proto test event
+ DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
+ ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildFlatDataWithComplexTimestamp(dateTime);
+
+ ByteBuffer buffer = ProtobufInputRowParserTest.toByteBuffer(event);
+
+ InputRow row = reader.parseInputRows(decoder.parse(buffer)).get(0);
+
+ ProtobufInputRowParserTest.verifyFlatDataWithComplexTimestamp(row, dateTime);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org