You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/06/14 21:19:08 UTC

[incubator-druid] branch master updated: Remove Apache Pig from the tests (#7810)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f581118  Remove Apache Pig from the tests (#7810)
f581118 is described below

commit f581118f05a8dcbd5e642d03025fc125bcc0bad8
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Fri Jun 14 23:18:58 2019 +0200

    Remove Apache Pig from the tests (#7810)
    
    * Remove Apache Pig from the tests
    
    * Remove the Pig specific part
    
    * Fix the Checkstyle issues
    
    * Cleanup a bit
    
    * Add an additional test
    
    * Revert the abstract class
---
 extensions-core/avro-extensions/pom.xml            |  46 -----
 .../druid/data/input/AvroHadoopInputRowParser.java |  15 +-
 .../druid/data/input/AvroStreamInputRowParser.java |   2 +-
 .../druid/data/input/avro/AvroFlattenerMaker.java  |  30 ++-
 .../apache/druid/data/input/avro/AvroParsers.java  |   5 +-
 .../data/input/avro/GenericAvroJsonProvider.java   |   3 +
 .../avro/SchemaRegistryBasedAvroBytesDecoder.java  |   3 +-
 .../avro/SchemaRepoBasedAvroBytesDecoder.java      |   9 +-
 .../Avro1124RESTRepositoryClientWrapper.java       |   4 +-
 .../schemarepo/Avro1124SubjectAndIdConverter.java  |   3 +-
 .../data/input/AvroHadoopInputRowParserTest.java   | 105 +++--------
 .../data/input/AvroStreamInputRowParserTest.java   |  90 ++++-----
 .../data/input/avro/AvroFlattenerMakerTest.java    | 203 +++++++++++++++++++++
 .../SchemaRegistryBasedAvroBytesDecoderTest.java   |   5 +-
 .../avro/ParquetAvroHadoopInputRowParser.java      |   4 +-
 15 files changed, 312 insertions(+), 215 deletions(-)

diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml
index aa18018..e839399 100644
--- a/extensions-core/avro-extensions/pom.xml
+++ b/extensions-core/avro-extensions/pom.xml
@@ -38,7 +38,6 @@
     <schemarepo.version>0.1.3</schemarepo.version>
     <confluent.version>3.0.1</confluent.version>
     <avro.version>1.8.2</avro.version>
-    <pig.version>0.15.0</pig.version>
   </properties>
 
   <repositories>
@@ -161,51 +160,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.pig</groupId>
-      <artifactId>pig</artifactId>
-      <version>${pig.version}</version>
-      <classifier>h2</classifier>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.avro</groupId>
-          <artifactId>avro</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>commons-net</groupId>
-          <artifactId>commons-net</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-core-asl</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.codehaus.jackson</groupId>
-          <artifactId>jackson-mapper-asl</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pig</groupId>
-      <artifactId>piggybank</artifactId>
-      <version>${pig.version}</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.pig</groupId>
-          <artifactId>pig</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.avro</groupId>
-          <artifactId>avro</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.apache.druid</groupId>
       <artifactId>druid-processing</artifactId>
       <version>${project.parent.version}</version>
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 39ce48e..7d7c8eb 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -33,19 +33,16 @@ import java.util.List;
 public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
 {
   private final ParseSpec parseSpec;
-  private final boolean fromPigAvroStorage;
   private final ObjectFlattener<GenericRecord> avroFlattener;
   private final MapInputRowParser mapParser;
 
   @JsonCreator
   public AvroHadoopInputRowParser(
-      @JsonProperty("parseSpec") ParseSpec parseSpec,
-      @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
+      @JsonProperty("parseSpec") ParseSpec parseSpec
   )
   {
     this.parseSpec = parseSpec;
-    this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
-    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
+    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
     this.mapParser = new MapInputRowParser(parseSpec);
   }
 
@@ -62,15 +59,9 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
     return parseSpec;
   }
 
-  @JsonProperty
-  public boolean isFromPigAvroStorage()
-  {
-    return fromPigAvroStorage;
-  }
-
   @Override
   public InputRowParser withParseSpec(ParseSpec parseSpec)
   {
-    return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
+    return new AvroHadoopInputRowParser(parseSpec);
   }
 }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index f375c63..7dddf23 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -48,7 +48,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
   {
     this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
     this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
-    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+    this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
     this.mapParser = new MapInputRowParser(parseSpec);
   }
 
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 0b55d76..a142dca 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.data.input.avro;
 
-import com.google.common.collect.Lists;
 import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.druid.java.util.common.StringUtils;
@@ -41,7 +39,7 @@ import java.util.stream.Collectors;
 
 public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
 {
-  static final Configuration JSONPATH_CONFIGURATION =
+  private static final Configuration JSONPATH_CONFIGURATION =
       Configuration.builder()
                    .jsonProvider(new GenericAvroJsonProvider())
                    .mappingProvider(new NotImplementedMappingProvider())
@@ -57,17 +55,17 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
       Schema.Type.DOUBLE
   );
 
-  public static boolean isPrimitive(Schema schema)
+  private static boolean isPrimitive(Schema schema)
   {
     return ROOT_TYPES.contains(schema.getType());
   }
 
-  public static boolean isPrimitiveArray(Schema schema)
+  private static boolean isPrimitiveArray(Schema schema)
   {
     return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType());
   }
 
-  public static boolean isOptionalPrimitive(Schema schema)
+  private static boolean isOptionalPrimitive(Schema schema)
   {
     return schema.getType().equals(Schema.Type.UNION) &&
            schema.getTypes().size() == 2 &&
@@ -79,7 +77,7 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
            );
   }
 
-  static boolean isFieldPrimitive(Schema.Field field)
+  private static boolean isFieldPrimitive(Schema.Field field)
   {
     return isPrimitive(field.schema()) ||
            isPrimitiveArray(field.schema()) ||
@@ -87,12 +85,13 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
   }
 
 
-  private final boolean fromPigAvroStorage;
   private final boolean binaryAsString;
 
-  public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
+  /**
+   * @param binaryAsString boolean to encode the byte[] as a string.
+   */
+  public AvroFlattenerMaker(final boolean binaryAsString)
   {
-    this.fromPigAvroStorage = fromPigAvroStorage;
     this.binaryAsString = binaryAsString;
   }
 
@@ -128,21 +127,16 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
 
   private Object transformValue(final Object field)
   {
-    if (fromPigAvroStorage && field instanceof GenericData.Array) {
-      return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
-    }
     if (field instanceof ByteBuffer) {
       if (binaryAsString) {
         return StringUtils.fromUtf8(((ByteBuffer) field).array());
       } else {
         return ((ByteBuffer) field).array();
       }
-    }
-    if (field instanceof Utf8) {
+    } else if (field instanceof Utf8) {
       return field.toString();
-    }
-    if (field instanceof List) {
-      return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
+    } else if (field instanceof List) {
+      return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
     }
     return field;
   }
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 92ea3ae..a8baa42 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -38,18 +38,17 @@ public class AvroParsers
 
   public static ObjectFlattener<GenericRecord> makeFlattener(
       final ParseSpec parseSpec,
-      final boolean fromPigAvroStorage,
       final boolean binaryAsString
   )
   {
     final JSONPathSpec flattenSpec;
-    if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) {
+    if (parseSpec instanceof AvroParseSpec) {
       flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec();
     } else {
       flattenSpec = JSONPathSpec.DEFAULT;
     }
 
-    return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
+    return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString));
   }
 
   public static List<InputRow> parseGenericRecord(
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index bfb6e22..42195ca 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -25,6 +25,8 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 
+import javax.annotation.Nullable;
+
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -139,6 +141,7 @@ public class GenericAvroJsonProvider implements JsonProvider
     }
   }
 
+  @Nullable
   @Override
   public Object getMapValue(final Object o, final String s)
   {
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 5b9d386..6ff97c4 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -32,6 +32,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
 {
@@ -83,7 +84,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
 
     SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
 
-    return registry != null ? registry.equals(that.registry) : that.registry == null;
+    return Objects.equals(registry, that.registry);
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
index e7de115..326b58a 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
@@ -38,6 +38,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Objects;
 
 public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder
 {
@@ -107,14 +108,10 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
 
     SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o;
 
-    if (subjectAndIdConverter != null
-        ? !subjectAndIdConverter.equals(that.subjectAndIdConverter)
-        : that.subjectAndIdConverter != null) {
+    if (!Objects.equals(subjectAndIdConverter, that.subjectAndIdConverter)) {
       return false;
     }
-    return !(schemaRepository != null
-        ? !schemaRepository.equals(that.schemaRepository)
-        : that.schemaRepository != null);
+    return Objects.equals(schemaRepository, that.schemaRepository);
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
index 0ba5579..e5c2541 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.schemarepo.client.Avro1124RESTRepositoryClient;
 
+import java.util.Objects;
+
 public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient
 {
   private final String url;
@@ -60,7 +62,7 @@ public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryC
 
     Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o;
 
-    return !(url != null ? !url.equals(that.url) : that.url != null);
+    return Objects.equals(url, that.url);
   }
 
   @Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
index 82f89f6..d2685f5 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
@@ -27,6 +27,7 @@ import org.schemarepo.api.converter.IdentityConverter;
 import org.schemarepo.api.converter.IntegerConverter;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 /**
  * This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro
@@ -88,7 +89,7 @@ public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter<Stri
 
     Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o;
 
-    return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null);
+    return Objects.equals(topic, converter.topic);
 
   }
 
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index dc25e8a..082f5e7 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.data.input;
 
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
@@ -29,12 +28,7 @@ import org.apache.avro.file.FileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.commons.io.FileUtils;
 import org.apache.druid.data.input.avro.AvroExtensionsModule;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -59,102 +53,59 @@ public class AvroHadoopInputRowParserTest
   }
 
   @Test
-  public void testParseNotFromPigAvroStorage() throws IOException
+  public void testParseNotFromSpark() throws IOException
   {
-    testParse(buildSomeAvroDatum(), false);
+    testParse(buildSomeAvroDatum());
   }
 
   @Test
-  public void testParseFromPiggyBankAvroStorage() throws IOException
+  public void testParseFromSpark() throws IOException
   {
-    testParse(buildPiggyBankAvro(), false);
+    testParse(buildAvroFromFile());
   }
 
-  @Test
-  public void testParseFromPigAvroStorage() throws IOException
+  private void testParse(GenericRecord record) throws IOException
   {
-    testParse(buildPigAvro(), true);
-  }
-
-  private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
-  {
-    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage);
+    AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC);
     AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsBytes(parser),
         AvroHadoopInputRowParser.class
     );
     InputRow inputRow = parser2.parseBatch(record).get(0);
-    assertInputRowCorrect(inputRow, DIMENSIONS, fromPigAvroStorage);
-  }
-
-
-  public static GenericRecord buildPigAvro() throws IOException
-  {
-    return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage");
+    assertInputRowCorrect(inputRow, DIMENSIONS);
   }
 
-  public static GenericRecord buildPiggyBankAvro() throws IOException
+  private static GenericRecord buildAvroFromFile() throws IOException
   {
-    return buildPigAvro(
-        buildSomeAvroDatum(),
-        "org.apache.pig.piggybank.storage.avro.AvroStorage",
-        "org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')"
+    return buildAvroFromFile(
+        buildSomeAvroDatum()
     );
   }
 
-  private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage)
+  private static GenericRecord buildAvroFromFile(GenericRecord datum)
       throws IOException
   {
     final File tmpDir = Files.createTempDir();
-    FileReader<GenericRecord> reader = null;
-    PigServer pigServer = null;
-    try {
-      // 0. write avro object into temp file.
-      File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
-      DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
-          new SpecificDatumWriter<>()
-      );
+
+    // 0. write avro object into temp file.
+    File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
+    try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
+        new SpecificDatumWriter<>()
+    )) {
       dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
       dataFileWriter.append(datum);
-      dataFileWriter.close();
-
-      // 1. read avro files into Pig
-      pigServer = new PigServer(ExecType.LOCAL);
-      pigServer.registerQuery(
-          StringUtils.format(
-              "A = LOAD '%s' USING %s;",
-              someAvroDatumFile,
-              inputStorage
-          )
-      );
-
-      // 2. write new avro file using AvroStorage
-      File outputDir = new File(tmpDir, "output");
-      ExecJob job = pigServer.store("A", String.valueOf(outputDir), outputStorage);
-
-      while (!job.hasCompleted()) {
-        Thread.sleep(100);
-      }
-
-      assert (job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-
-      // 3. read avro object from AvroStorage
-      reader = DataFileReader.openReader(
-          new File(outputDir, "part-m-00000.avro"),
-          new GenericDatumReader<GenericRecord>()
-      );
-
-      return reader.next();
     }
-    catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-    finally {
-      if (pigServer != null) {
-        pigServer.shutdown();
-      }
-      Closeables.close(reader, true);
-      FileUtils.deleteDirectory(tmpDir);
+
+    final GenericRecord record;
+    // 3. read avro object from AvroStorage
+    try (FileReader<GenericRecord> reader = DataFileReader.openReader(
+        someAvroDatumFile,
+        new GenericDatumReader<>()
+    )) {
+      record = reader.next();
     }
+
+    return record;
   }
+
 }
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 184c8b2..cadf7a9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -55,6 +55,7 @@ import org.schemarepo.api.converter.AvroSchemaConverter;
 import org.schemarepo.api.converter.IdentityConverter;
 import org.schemarepo.api.converter.IntegerConverter;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -71,20 +72,20 @@ import static org.junit.Assert.assertEquals;
 
 public class AvroStreamInputRowParserTest
 {
-  public static final String EVENT_TYPE = "eventType";
-  public static final String ID = "id";
-  public static final String SOME_OTHER_ID = "someOtherId";
-  public static final String IS_VALID = "isValid";
-  public static final String TOPIC = "aTopic";
-  public static final String EVENT_TYPE_VALUE = "type-a";
-  public static final long ID_VALUE = 1976491L;
-  public static final long SOME_OTHER_ID_VALUE = 6568719896L;
-  public static final float SOME_FLOAT_VALUE = 0.23555f;
-  public static final int SOME_INT_VALUE = 1;
-  public static final long SOME_LONG_VALUE = 679865987569912369L;
-  public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC());
-  public static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
-  public static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
+  private static final String EVENT_TYPE = "eventType";
+  private static final String ID = "id";
+  private static final String SOME_OTHER_ID = "someOtherId";
+  private static final String IS_VALID = "isValid";
+  private static final String TOPIC = "aTopic";
+  private static final String EVENT_TYPE_VALUE = "type-a";
+  private static final long ID_VALUE = 1976491L;
+  private static final long SOME_OTHER_ID_VALUE = 6568719896L;
+  private static final float SOME_FLOAT_VALUE = 0.23555f;
+  private static final int SOME_INT_VALUE = 1;
+  private static final long SOME_LONG_VALUE = 679865987569912369L;
+  private static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC());
+  static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
+  private static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
       "nested",
       SOME_OTHER_ID,
       "someStringArray",
@@ -98,7 +99,7 @@ public class AvroStreamInputRowParserTest
       "someInt",
       "timestamp"
   );
-  public static final AvroParseSpec PARSE_SPEC = new AvroParseSpec(
+  static final AvroParseSpec PARSE_SPEC = new AvroParseSpec(
       new TimestampSpec("nested", "millis", null),
       new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null),
       new JSONPathSpec(
@@ -108,7 +109,7 @@ public class AvroStreamInputRowParserTest
           )
       )
   );
-  public static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec(
+  private static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec(
       new TimestampSpec("nested", "millis", null),
       new DimensionsSpec(null, null, null),
       new JSONPathSpec(
@@ -118,19 +119,19 @@ public class AvroStreamInputRowParserTest
           )
       )
   );
-  public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
+  private static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
   private static final long SUB_LONG_VALUE = 1543698L;
   private static final int SUB_INT_VALUE = 4892;
-  public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
+  private static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
                                                                  .setSubInt(SUB_INT_VALUE)
                                                                  .setSubLong(SUB_LONG_VALUE)
                                                                  .build();
-  public static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1");
-  public static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
-  public static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
-      new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
+  private static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1");
+  private static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
+  private static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
+      new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
       {
-        @Nullable
+        @Nonnull
         @Override
         public Integer apply(@Nullable CharSequence input)
         {
@@ -138,10 +139,10 @@ public class AvroStreamInputRowParserTest
         }
       }
   );
-  public static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
-      new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
+  private static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
+      new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
       {
-        @Nullable
+        @Nonnull
         @Override
         public CharSequence apply(@Nullable CharSequence input)
         {
@@ -149,8 +150,8 @@ public class AvroStreamInputRowParserTest
         }
       }
   );
-  public static final String SOME_UNION_VALUE = "string as union";
-  public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
+  private static final String SOME_UNION_VALUE = "string as union";
+  private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
 
   private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
 
@@ -173,7 +174,7 @@ public class AvroStreamInputRowParserTest
     Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC,
-        new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -190,7 +191,7 @@ public class AvroStreamInputRowParserTest
     Repository repository = new InMemoryRepository(null);
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC,
-        new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -221,7 +222,7 @@ public class AvroStreamInputRowParserTest
 
     InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
 
-    assertInputRowCorrect(inputRow, DIMENSIONS, false);
+    assertInputRowCorrect(inputRow, DIMENSIONS);
   }
 
   @Test
@@ -231,7 +232,7 @@ public class AvroStreamInputRowParserTest
     Repository repository = new InMemoryRepository(null);
     AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
         PARSE_SPEC_SCHEMALESS,
-        new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+        new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
     );
     ByteBufferInputRowParser parser2 = jsonMapper.readValue(
         jsonMapper.writeValueAsString(parser),
@@ -244,7 +245,7 @@ public class AvroStreamInputRowParserTest
 
     // encode schema id
     Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
-    TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<Integer, Schema, String>(
+    TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
         repository,
         new IntegerConverter(),
         new AvroSchemaConverter(),
@@ -253,19 +254,20 @@ public class AvroStreamInputRowParserTest
     Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
     ByteBuffer byteBuffer = ByteBuffer.allocate(4);
     converter.putSubjectAndId(id, byteBuffer);
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    out.write(byteBuffer.array());
-    // encode data
-    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
-    // write avro datum to bytes
-    writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
+    try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      out.write(byteBuffer.array());
+      // encode data
+      DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
+      // write avro datum to bytes
+      writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
 
-    InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
+      InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
 
-    assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false);
+      assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS);
+    }
   }
 
-  public static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro)
+  static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions)
   {
     assertEquals(expectedDimensions, inputRow.getDimensions());
     assertEquals(1543698L, inputRow.getTimestampFromEpoch());
@@ -316,9 +318,7 @@ public class AvroStreamInputRowParserTest
     );
     assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
     assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
-    if (isFromPigAvro) {
-      assertEquals(String.valueOf(SOME_FIXED_VALUE), Arrays.toString((byte[]) inputRow.getRaw("someFixed")));
-    }
+    assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
     assertEquals(
         Arrays.toString(SOME_BYTES_VALUE.array()),
         Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
new file mode 100644
index 0000000..675a9ce
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import org.apache.druid.data.input.AvroStreamInputRowParserTest;
+import org.apache.druid.data.input.SomeAvroDatum;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroFlattenerMakerTest
+{
+
+  @Test
+  public void getRootField()
+  {
+    final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+    Assert.assertEquals(
+        record.timestamp,
+        flattener.getRootField(record, "timestamp")
+    );
+    Assert.assertEquals(
+        record.eventType,
+        flattener.getRootField(record, "eventType")
+    );
+    Assert.assertEquals(
+        record.id,
+        flattener.getRootField(record, "id")
+    );
+    Assert.assertEquals(
+        record.someOtherId,
+        flattener.getRootField(record, "someOtherId")
+    );
+    Assert.assertEquals(
+        record.isValid,
+        flattener.getRootField(record, "isValid")
+    );
+    Assert.assertEquals(
+        record.someIntArray,
+        flattener.getRootField(record, "someIntArray")
+    );
+    Assert.assertEquals(
+        record.someStringArray,
+        flattener.getRootField(record, "someStringArray")
+    );
+    Assert.assertEquals(
+        record.someIntValueMap,
+        flattener.getRootField(record, "someIntValueMap")
+    );
+    Assert.assertEquals(
+        record.someStringValueMap,
+        flattener.getRootField(record, "someStringValueMap")
+    );
+    Assert.assertEquals(
+        record.someUnion,
+        flattener.getRootField(record, "someUnion")
+    );
+    Assert.assertEquals(
+        record.someNull,
+        flattener.getRootField(record, "someNull")
+    );
+    Assert.assertEquals(
+        record.someFixed,
+        flattener.getRootField(record, "someFixed")
+    );
+    Assert.assertEquals(
+        // Casted to an array by transformValue
+        record.someBytes.array(),
+        flattener.getRootField(record, "someBytes")
+    );
+    Assert.assertEquals(
+        record.someEnum,
+        flattener.getRootField(record, "someEnum")
+    );
+    Assert.assertEquals(
+        record.someRecord,
+        flattener.getRootField(record, "someRecord")
+    );
+    Assert.assertEquals(
+        record.someLong,
+        flattener.getRootField(record, "someLong")
+    );
+    Assert.assertEquals(
+        record.someInt,
+        flattener.getRootField(record, "someInt")
+    );
+    Assert.assertEquals(
+        record.someFloat,
+        flattener.getRootField(record, "someFloat")
+    );
+  }
+
+  @Test
+  public void makeJsonPathExtractor()
+  {
+    final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+    Assert.assertEquals(
+        record.timestamp,
+        flattener.makeJsonPathExtractor("$.timestamp").apply(record)
+    );
+    Assert.assertEquals(
+        record.eventType,
+        flattener.makeJsonPathExtractor("$.eventType").apply(record)
+    );
+    Assert.assertEquals(
+        record.id,
+        flattener.makeJsonPathExtractor("$.id").apply(record)
+    );
+    Assert.assertEquals(
+        record.someOtherId,
+        flattener.makeJsonPathExtractor("$.someOtherId").apply(record)
+    );
+    Assert.assertEquals(
+        record.isValid,
+        flattener.makeJsonPathExtractor("$.isValid").apply(record)
+    );
+    Assert.assertEquals(
+        record.someIntArray,
+        flattener.makeJsonPathExtractor("$.someIntArray").apply(record)
+    );
+    Assert.assertEquals(
+        record.someStringArray,
+        flattener.makeJsonPathExtractor("$.someStringArray").apply(record)
+    );
+    Assert.assertEquals(
+        record.someIntValueMap,
+        flattener.makeJsonPathExtractor("$.someIntValueMap").apply(record)
+    );
+    Assert.assertEquals(
+        record.someStringValueMap,
+        flattener.makeJsonPathExtractor("$.someStringValueMap").apply(record)
+    );
+    Assert.assertEquals(
+        record.someUnion,
+        flattener.makeJsonPathExtractor("$.someUnion").apply(record)
+    );
+    Assert.assertEquals(
+        record.someNull,
+        flattener.makeJsonPathExtractor("$.someNull").apply(record)
+    );
+    Assert.assertEquals(
+        record.someFixed,
+        flattener.makeJsonPathExtractor("$.someFixed").apply(record)
+    );
+    Assert.assertEquals(
+        // Casted to an array by transformValue
+        record.someBytes.array(),
+        flattener.makeJsonPathExtractor("$.someBytes").apply(record)
+    );
+    Assert.assertEquals(
+        record.someEnum,
+        flattener.makeJsonPathExtractor("$.someEnum").apply(record)
+    );
+    Assert.assertEquals(
+        record.someRecord,
+        flattener.makeJsonPathExtractor("$.someRecord").apply(record)
+    );
+    Assert.assertEquals(
+        record.someLong,
+        flattener.makeJsonPathExtractor("$.someLong").apply(record)
+    );
+    Assert.assertEquals(
+        record.someInt,
+        flattener.makeJsonPathExtractor("$.someInt").apply(record)
+    );
+    Assert.assertEquals(
+        record.someFloat,
+        flattener.makeJsonPathExtractor("$.someFloat").apply(record)
+    );
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void makeJsonQueryExtractor()
+  {
+    final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+    final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+    Assert.assertEquals(
+        record.timestamp,
+        flattener.makeJsonQueryExtractor("$.timestamp").apply(record)
+    );
+  }
+}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 911819d..5481c0e 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -45,8 +45,7 @@ import static org.mockito.Mockito.when;
  */
 public class SchemaRegistryBasedAvroBytesDecoderTest
 {
-
-  SchemaRegistryClient registry;
+  private SchemaRegistryClient registry;
 
   @Before
   public void setUp()
@@ -96,7 +95,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
     new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
   }
 
-  byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
+  private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
   {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index 1658d17..3c63eb2 100755
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
 import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +75,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
 
     this.recordFlattener = ObjectFlatteners.create(
         flattenSpec,
-        new AvroFlattenerMaker(false, this.binaryAsString)
+        new AvroFlattenerMaker(this.binaryAsString)
     );
   }
 
@@ -92,6 +93,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
   /**
    * imitate avro extension {@link org.apache.druid.data.input.avro.AvroParsers#parseGenericRecord}
    */
+  @Nonnull
   @Override
   public List<InputRow> parseBatch(GenericRecord record)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org