You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2021/07/19 16:32:31 UTC
[druid] branch master updated: Fix avro json serde issues (#11455)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2705fe9 Fix avro json serde issues (#11455)
2705fe9 is described below
commit 2705fe98fa4513c506ac4ffb4c71fc2be51f7738
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Jul 19 09:32:05 2021 -0700
Fix avro json serde issues (#11455)
---
.../druid/data/input/AvroHadoopInputRowParser.java | 46 +++++++++
.../druid/data/input/AvroStreamInputRowParser.java | 29 +++++-
.../druid/data/input/avro/AvroOCFInputFormat.java | 35 ++++++-
.../data/input/avro/AvroStreamInputFormat.java | 13 ++-
.../data/input/AvroHadoopInputRowParserTest.java | 24 +++++
.../data/input/AvroStreamInputFormatTest.java | 19 +++-
.../data/input/AvroStreamInputRowParserTest.java | 18 ++++
.../data/input/avro/AvroOCFInputFormatTest.java | 108 +++++++++++++++++++++
8 files changed, 284 insertions(+), 8 deletions(-)
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 165f6a4..39bdc87 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import java.util.List;
+import java.util.Objects;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
@@ -74,9 +75,54 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
return fromPigAvroStorage;
}
+ @JsonProperty
+ public Boolean getBinaryAsString()
+ {
+ return binaryAsString;
+ }
+
+ @JsonProperty
+ public Boolean isExtractUnionsByType()
+ {
+ return extractUnionsByType;
+ }
+
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AvroHadoopInputRowParser that = (AvroHadoopInputRowParser) o;
+ return fromPigAvroStorage == that.fromPigAvroStorage
+ && binaryAsString == that.binaryAsString
+ && extractUnionsByType == that.extractUnionsByType
+ && Objects.equals(parseSpec, that.parseSpec);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AvroHadoopInputRowParser{" +
+ "parseSpec=" + parseSpec +
+ ", fromPigAvroStorage=" + fromPigAvroStorage +
+ ", binaryAsString=" + binaryAsString +
+ ", extractUnionsByType=" + extractUnionsByType +
+ '}';
+ }
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index fcb8bd8..969bf25 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -77,6 +77,18 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
return avroBytesDecoder;
}
+ @JsonProperty
+ public Boolean getBinaryAsString()
+ {
+ return binaryAsString;
+ }
+
+ @JsonProperty
+ public Boolean isExtractUnionsByType()
+ {
+ return extractUnionsByType;
+ }
+
@Override
public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
{
@@ -99,12 +111,25 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
}
final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
return Objects.equals(parseSpec, that.parseSpec) &&
- Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
+ Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
+ Objects.equals(binaryAsString, that.binaryAsString) &&
+ Objects.equals(extractUnionsByType, that.extractUnionsByType);
}
@Override
public int hashCode()
{
- return Objects.hash(parseSpec, avroBytesDecoder);
+ return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AvroStreamInputRowParser{" +
+ "parseSpec=" + parseSpec +
+ ", binaryAsString=" + binaryAsString +
+ ", extractUnionsByType=" + extractUnionsByType +
+ ", avroBytesDecoder=" + avroBytesDecoder +
+ '}';
}
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
index 7417477..f5ea6a3 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
@@ -43,6 +43,8 @@ public class AvroOCFInputFormat extends NestedInputFormat
private final boolean binaryAsString;
private final boolean extractUnionsByType;
+ private final Map<String, Object> schema;
+
@Nullable
private final Schema readerSchema;
@@ -56,6 +58,7 @@ public class AvroOCFInputFormat extends NestedInputFormat
) throws Exception
{
super(flattenSpec);
+ this.schema = schema;
// If a reader schema is supplied create the datum reader with said schema, otherwise use the writer schema
if (schema != null) {
String schemaStr = mapper.writeValueAsString(schema);
@@ -76,6 +79,24 @@ public class AvroOCFInputFormat extends NestedInputFormat
return false;
}
+ @JsonProperty
+ public Map<String, Object> getSchema()
+ {
+ return schema;
+ }
+
+ @JsonProperty
+ public Boolean getBinaryAsString()
+ {
+ return binaryAsString;
+ }
+
+ @JsonProperty
+ public Boolean isExtractUnionsByType()
+ {
+ return extractUnionsByType;
+ }
+
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
@@ -103,13 +124,23 @@ public class AvroOCFInputFormat extends NestedInputFormat
return false;
}
AvroOCFInputFormat that = (AvroOCFInputFormat) o;
- return binaryAsString == that.binaryAsString &&
+ return binaryAsString == that.binaryAsString && extractUnionsByType == that.extractUnionsByType &&
Objects.equals(readerSchema, that.readerSchema);
}
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), binaryAsString, readerSchema);
+ return Objects.hash(super.hashCode(), binaryAsString, readerSchema, extractUnionsByType);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AvroOCFInputFormat{" +
+ "binaryAsString=" + binaryAsString +
+ ", extractUnionsByType=" + extractUnionsByType +
+ ", readerSchema=" + readerSchema +
+ '}';
}
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
index d42f359..bf0cecf 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
@@ -71,6 +71,12 @@ public class AvroStreamInputFormat extends NestedInputFormat
return binaryAsString;
}
+ @JsonProperty
+ public Boolean isExtractUnionsByType()
+ {
+ return extractUnionsByType;
+ }
+
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
@@ -95,13 +101,14 @@ public class AvroStreamInputFormat extends NestedInputFormat
}
final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
- Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
- Objects.equals(binaryAsString, that.binaryAsString);
+ Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
+ Objects.equals(binaryAsString, that.binaryAsString) &&
+ Objects.equals(extractUnionsByType, that.extractUnionsByType);
}
@Override
public int hashCode()
{
- return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString);
+ return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString, extractUnionsByType);
}
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index c6ade85..12791b5 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.avro.AvroExtensionsModule;
import org.apache.druid.java.util.common.FileUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -49,6 +50,28 @@ public class AvroHadoopInputRowParserTest
}
@Test
+ public void testSerde() throws IOException
+ {
+ AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, false, false, false);
+ AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsBytes(parser),
+ AvroHadoopInputRowParser.class
+ );
+ Assert.assertEquals(parser, parser2);
+ }
+
+ @Test
+ public void testSerdeNonDefaults() throws IOException
+ {
+ AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, true, true, true);
+ AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsBytes(parser),
+ AvroHadoopInputRowParser.class
+ );
+ Assert.assertEquals(parser, parser2);
+ }
+
+ @Test
public void testParseNotFromPigAvroStorage() throws IOException
{
testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum(), false);
@@ -67,6 +90,7 @@ public class AvroHadoopInputRowParserTest
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
+ Assert.assertEquals(parser, parser2);
InputRow inputRow = parser2.parseBatch(record).get(0);
AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage);
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 918aa0a..4213008 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -64,7 +64,6 @@ import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSome
public class AvroStreamInputFormatTest
{
-
private static final String EVENT_TYPE = "eventType";
private static final String ID = "id";
private static final String SOME_OTHER_ID = "someOtherId";
@@ -130,6 +129,24 @@ public class AvroStreamInputFormatTest
}
@Test
+ public void testSerdeNonDefault() throws IOException
+ {
+ Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
+ AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+ flattenSpec,
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ true,
+ true
+ );
+ NestedInputFormat inputFormat2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(inputFormat),
+ NestedInputFormat.class
+ );
+
+ Assert.assertEquals(inputFormat, inputFormat2);
+ }
+
+ @Test
public void testSerdeForSchemaRegistry() throws IOException
{
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 8ed9f44..69608db 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -190,6 +190,24 @@ public class AvroStreamInputRowParserTest
}
@Test
+ public void testSerdeNonDefault() throws IOException
+ {
+ Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
+ AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
+ PARSE_SPEC,
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+ true,
+ true
+ );
+ ByteBufferInputRowParser parser2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(parser),
+ ByteBufferInputRowParser.class
+ );
+
+ Assert.assertEquals(parser, parser2);
+ }
+
+ @Test
public void testParse() throws SchemaValidationException, IOException
{
// serde test
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
new file mode 100644
index 0000000..a5c40a7
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class AvroOCFInputFormatTest
+{
+ private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+ private JSONPathSpec flattenSpec;
+
+ @Before
+ public void before()
+ {
+ flattenSpec = new JSONPathSpec(
+ true,
+ ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong")
+ )
+ );
+ for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
+ jsonMapper.registerModule(jacksonModule);
+ }
+ jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
+ jsonMapper,
+ flattenSpec,
+ null,
+ false,
+ false
+ );
+ NestedInputFormat inputFormat2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(inputFormat),
+ NestedInputFormat.class
+ );
+
+ Assert.assertEquals(inputFormat, inputFormat2);
+ }
+
+ @Test
+ public void testSerdeNonDefaults() throws Exception
+ {
+ String schemaStr = "{\n"
+ + " \"namespace\": \"org.apache.druid.data.input\",\n"
+ + " \"name\": \"SomeAvroDatum\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"fields\" : [\n"
+ + " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ + " {\"name\":\"someLong\",\"type\":\"long\"}\n,"
+ + " {\"name\":\"eventClass\",\"type\":\"string\", \"aliases\": [\"eventType\"]}\n"
+ + " ]\n"
+ + "}";
+
+ TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
+ {
+ };
+ final Map<String, Object> readerSchema = jsonMapper.readValue(schemaStr, typeRef);
+ AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
+ jsonMapper,
+ flattenSpec,
+ readerSchema,
+ true,
+ true
+ );
+ NestedInputFormat inputFormat2 = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(inputFormat),
+ NestedInputFormat.class
+ );
+
+ Assert.assertEquals(inputFormat, inputFormat2);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org