You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2021/07/19 16:32:31 UTC

[druid] branch master updated: Fix avro json serde issues (#11455)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2705fe9  Fix avro json serde issues (#11455)
2705fe9 is described below

commit 2705fe98fa4513c506ac4ffb4c71fc2be51f7738
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Mon Jul 19 09:32:05 2021 -0700

    Fix avro json serde issues (#11455)
---
 .../druid/data/input/AvroHadoopInputRowParser.java |  46 +++++++++
 .../druid/data/input/AvroStreamInputRowParser.java |  29 +++++-
 .../druid/data/input/avro/AvroOCFInputFormat.java  |  35 ++++++-
 .../data/input/avro/AvroStreamInputFormat.java     |  13 ++-
 .../data/input/AvroHadoopInputRowParserTest.java   |  24 +++++
 .../data/input/AvroStreamInputFormatTest.java      |  19 +++-
 .../data/input/AvroStreamInputRowParserTest.java   |  18 ++++
 .../data/input/avro/AvroOCFInputFormatTest.java    | 108 +++++++++++++++++++++
 8 files changed, 284 insertions(+), 8 deletions(-)

diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 165f6a4..39bdc87 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 
 import java.util.List;
+import java.util.Objects;
 
 public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
 {
@@ -74,9 +75,54 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
     return fromPigAvroStorage;
   }
 
+  @JsonProperty
+  public Boolean getBinaryAsString()
+  {
+    return binaryAsString;
+  }
+
+  @JsonProperty
+  public Boolean isExtractUnionsByType()
+  {
+    return extractUnionsByType;
+  }
+
   @Override
   public InputRowParser withParseSpec(ParseSpec parseSpec)
   {
     return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AvroHadoopInputRowParser that = (AvroHadoopInputRowParser) o;
+    return fromPigAvroStorage == that.fromPigAvroStorage
+           && binaryAsString == that.binaryAsString
+           && extractUnionsByType == that.extractUnionsByType
+           && Objects.equals(parseSpec, that.parseSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AvroHadoopInputRowParser{" +
+           "parseSpec=" + parseSpec +
+           ", fromPigAvroStorage=" + fromPigAvroStorage +
+           ", binaryAsString=" + binaryAsString +
+           ", extractUnionsByType=" + extractUnionsByType +
+           '}';
+  }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index fcb8bd8..969bf25 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -77,6 +77,18 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
     return avroBytesDecoder;
   }
 
+  @JsonProperty
+  public Boolean getBinaryAsString()
+  {
+    return binaryAsString;
+  }
+
+  @JsonProperty
+  public Boolean isExtractUnionsByType()
+  {
+    return extractUnionsByType;
+  }
+
   @Override
   public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
   {
@@ -99,12 +111,25 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
     }
     final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
     return Objects.equals(parseSpec, that.parseSpec) &&
-           Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
+           Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
+           Objects.equals(binaryAsString, that.binaryAsString) &&
+           Objects.equals(extractUnionsByType, that.extractUnionsByType);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(parseSpec, avroBytesDecoder);
+    return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AvroStreamInputRowParser{" +
+           "parseSpec=" + parseSpec +
+           ", binaryAsString=" + binaryAsString +
+           ", extractUnionsByType=" + extractUnionsByType +
+           ", avroBytesDecoder=" + avroBytesDecoder +
+           '}';
   }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
index 7417477..f5ea6a3 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java
@@ -43,6 +43,8 @@ public class AvroOCFInputFormat extends NestedInputFormat
 
   private final boolean binaryAsString;
   private final boolean extractUnionsByType;
+  private final Map<String, Object> schema;
+
   @Nullable
   private final Schema readerSchema;
 
@@ -56,6 +58,7 @@ public class AvroOCFInputFormat extends NestedInputFormat
   ) throws Exception
   {
     super(flattenSpec);
+    this.schema = schema;
     // If a reader schema is supplied create the datum reader with said schema, otherwise use the writer schema
     if (schema != null) {
       String schemaStr = mapper.writeValueAsString(schema);
@@ -76,6 +79,24 @@ public class AvroOCFInputFormat extends NestedInputFormat
     return false;
   }
 
+  @JsonProperty
+  public Map<String, Object> getSchema()
+  {
+    return schema;
+  }
+
+  @JsonProperty
+  public Boolean getBinaryAsString()
+  {
+    return binaryAsString;
+  }
+
+  @JsonProperty
+  public Boolean isExtractUnionsByType()
+  {
+    return extractUnionsByType;
+  }
+
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
@@ -103,13 +124,23 @@ public class AvroOCFInputFormat extends NestedInputFormat
       return false;
     }
     AvroOCFInputFormat that = (AvroOCFInputFormat) o;
-    return binaryAsString == that.binaryAsString &&
+    return binaryAsString == that.binaryAsString && extractUnionsByType == that.extractUnionsByType &&
            Objects.equals(readerSchema, that.readerSchema);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), binaryAsString, readerSchema);
+    return Objects.hash(super.hashCode(), binaryAsString, readerSchema, extractUnionsByType);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "AvroOCFInputFormat{" +
+           "binaryAsString=" + binaryAsString +
+           ", extractUnionsByType=" + extractUnionsByType +
+           ", readerSchema=" + readerSchema +
+           '}';
   }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
index d42f359..bf0cecf 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
@@ -71,6 +71,12 @@ public class AvroStreamInputFormat extends NestedInputFormat
     return binaryAsString;
   }
 
+  @JsonProperty
+  public Boolean isExtractUnionsByType()
+  {
+    return extractUnionsByType;
+  }
+
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
@@ -95,13 +101,14 @@ public class AvroStreamInputFormat extends NestedInputFormat
     }
     final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
     return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
-        Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
-        Objects.equals(binaryAsString, that.binaryAsString);
+           Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
+           Objects.equals(binaryAsString, that.binaryAsString) &&
+           Objects.equals(extractUnionsByType, that.extractUnionsByType);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString);
+    return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString, extractUnionsByType);
   }
 }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index c6ade85..12791b5 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.druid.data.input.avro.AvroExtensionsModule;
 import org.apache.druid.java.util.common.FileUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -49,6 +50,28 @@ public class AvroHadoopInputRowParserTest
   }
 
   @Test
+  public void testSerde() throws IOException
+  {
+    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, false, false, false);
+    AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsBytes(parser),
+        AvroHadoopInputRowParser.class
+    );
+    Assert.assertEquals(parser, parser2);
+  }
+
+  @Test
+  public void testSerdeNonDefaults() throws IOException
+  {
+    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, true, true, true);
+    AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsBytes(parser),
+        AvroHadoopInputRowParser.class
+    );
+    Assert.assertEquals(parser, parser2);
+  }
+
+  @Test
   public void testParseNotFromPigAvroStorage() throws IOException
   {
     testParse(AvroStreamInputRowParserTest.buildSomeAvroDatum(), false);
@@ -67,6 +90,7 @@ public class AvroHadoopInputRowParserTest
         jsonMapper.writeValueAsBytes(parser),
         AvroHadoopInputRowParser.class
     );
+    Assert.assertEquals(parser, parser2);
     InputRow inputRow = parser2.parseBatch(record).get(0);
     AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage);
   }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
index 918aa0a..4213008 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java
@@ -64,7 +64,6 @@ import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSome
 
 public class AvroStreamInputFormatTest
 {
-
   private static final String EVENT_TYPE = "eventType";
   private static final String ID = "id";
   private static final String SOME_OTHER_ID = "someOtherId";
@@ -130,6 +129,24 @@ public class AvroStreamInputFormatTest
   }
 
   @Test
+  public void testSerdeNonDefault() throws IOException
+  {
+    Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
+    AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
+        flattenSpec,
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        true,
+        true
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+
+    Assert.assertEquals(inputFormat, inputFormat2);
+  }
+
+  @Test
   public void testSerdeForSchemaRegistry() throws IOException
   {
     AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 8ed9f44..69608db 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -190,6 +190,24 @@ public class AvroStreamInputRowParserTest
   }
 
   @Test
+  public void testSerdeNonDefault() throws IOException
+  {
+    Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
+    AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
+        PARSE_SPEC,
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
+        true,
+        true
+    );
+    ByteBufferInputRowParser parser2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(parser),
+        ByteBufferInputRowParser.class
+    );
+
+    Assert.assertEquals(parser, parser2);
+  }
+
+  @Test
   public void testParse() throws SchemaValidationException, IOException
   {
     // serde test
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
new file mode 100644
index 0000000..a5c40a7
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class AvroOCFInputFormatTest
+{
+  private final ObjectMapper jsonMapper = new DefaultObjectMapper();
+  private JSONPathSpec flattenSpec;
+
+  @Before
+  public void before()
+  {
+    flattenSpec = new JSONPathSpec(
+        true,
+        ImmutableList.of(
+            new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong")
+        )
+    );
+    for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
+      jsonMapper.registerModule(jacksonModule);
+    }
+    jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
+  }
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
+        jsonMapper,
+        flattenSpec,
+        null,
+        false,
+        false
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+
+    Assert.assertEquals(inputFormat, inputFormat2);
+  }
+
+  @Test
+  public void testSerdeNonDefaults() throws Exception
+  {
+    String schemaStr = "{\n"
+                       + "  \"namespace\": \"org.apache.druid.data.input\",\n"
+                       + "  \"name\": \"SomeAvroDatum\",\n"
+                       + "  \"type\": \"record\",\n"
+                       + "  \"fields\" : [\n"
+                       + "    {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+                       + "    {\"name\":\"someLong\",\"type\":\"long\"}\n,"
+                       + "    {\"name\":\"eventClass\",\"type\":\"string\", \"aliases\": [\"eventType\"]}\n"
+                       + "  ]\n"
+                       + "}";
+
+    TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
+    {
+    };
+    final Map<String, Object> readerSchema = jsonMapper.readValue(schemaStr, typeRef);
+    AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
+        jsonMapper,
+        flattenSpec,
+        readerSchema,
+        true,
+        true
+    );
+    NestedInputFormat inputFormat2 = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(inputFormat),
+        NestedInputFormat.class
+    );
+
+    Assert.assertEquals(inputFormat, inputFormat2);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org