You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/06/14 21:19:08 UTC
[incubator-druid] branch master updated: Remove Apache Pig from the
tests (#7810)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f581118 Remove Apache Pig from the tests (#7810)
f581118 is described below
commit f581118f05a8dcbd5e642d03025fc125bcc0bad8
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Fri Jun 14 23:18:58 2019 +0200
Remove Apache Pig from the tests (#7810)
* Remove Apache Pig from the tests
* Remove the Pig specific part
* Fix the Checkstyle issues
* Cleanup a bit
* Add an additional test
* Revert the abstract class
---
extensions-core/avro-extensions/pom.xml | 46 -----
.../druid/data/input/AvroHadoopInputRowParser.java | 15 +-
.../druid/data/input/AvroStreamInputRowParser.java | 2 +-
.../druid/data/input/avro/AvroFlattenerMaker.java | 30 ++-
.../apache/druid/data/input/avro/AvroParsers.java | 5 +-
.../data/input/avro/GenericAvroJsonProvider.java | 3 +
.../avro/SchemaRegistryBasedAvroBytesDecoder.java | 3 +-
.../avro/SchemaRepoBasedAvroBytesDecoder.java | 9 +-
.../Avro1124RESTRepositoryClientWrapper.java | 4 +-
.../schemarepo/Avro1124SubjectAndIdConverter.java | 3 +-
.../data/input/AvroHadoopInputRowParserTest.java | 105 +++--------
.../data/input/AvroStreamInputRowParserTest.java | 90 ++++-----
.../data/input/avro/AvroFlattenerMakerTest.java | 203 +++++++++++++++++++++
.../SchemaRegistryBasedAvroBytesDecoderTest.java | 5 +-
.../avro/ParquetAvroHadoopInputRowParser.java | 4 +-
15 files changed, 312 insertions(+), 215 deletions(-)
diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml
index aa18018..e839399 100644
--- a/extensions-core/avro-extensions/pom.xml
+++ b/extensions-core/avro-extensions/pom.xml
@@ -38,7 +38,6 @@
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<avro.version>1.8.2</avro.version>
- <pig.version>0.15.0</pig.version>
</properties>
<repositories>
@@ -161,51 +160,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.pig</groupId>
- <artifactId>pig</artifactId>
- <version>${pig.version}</version>
- <classifier>h2</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.pig</groupId>
- <artifactId>piggybank</artifactId>
- <version>${pig.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.pig</groupId>
- <artifactId>pig</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
index 39ce48e..7d7c8eb 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java
@@ -33,19 +33,16 @@ import java.util.List;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
- private final boolean fromPigAvroStorage;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;
@JsonCreator
public AvroHadoopInputRowParser(
- @JsonProperty("parseSpec") ParseSpec parseSpec,
- @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
+ @JsonProperty("parseSpec") ParseSpec parseSpec
)
{
this.parseSpec = parseSpec;
- this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
- this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
+ this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.mapParser = new MapInputRowParser(parseSpec);
}
@@ -62,15 +59,9 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
return parseSpec;
}
- @JsonProperty
- public boolean isFromPigAvroStorage()
- {
- return fromPigAvroStorage;
- }
-
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
- return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
+ return new AvroHadoopInputRowParser(parseSpec);
}
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
index f375c63..7dddf23 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java
@@ -48,7 +48,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
{
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
- this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
+ this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.mapParser = new MapInputRowParser(parseSpec);
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
index 0b55d76..a142dca 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java
@@ -19,12 +19,10 @@
package org.apache.druid.data.input.avro;
-import com.google.common.collect.Lists;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils;
@@ -41,7 +39,7 @@ import java.util.stream.Collectors;
public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{
- static final Configuration JSONPATH_CONFIGURATION =
+ private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new GenericAvroJsonProvider())
.mappingProvider(new NotImplementedMappingProvider())
@@ -57,17 +55,17 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
Schema.Type.DOUBLE
);
- public static boolean isPrimitive(Schema schema)
+ private static boolean isPrimitive(Schema schema)
{
return ROOT_TYPES.contains(schema.getType());
}
- public static boolean isPrimitiveArray(Schema schema)
+ private static boolean isPrimitiveArray(Schema schema)
{
return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType());
}
- public static boolean isOptionalPrimitive(Schema schema)
+ private static boolean isOptionalPrimitive(Schema schema)
{
return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2 &&
@@ -79,7 +77,7 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
);
}
- static boolean isFieldPrimitive(Schema.Field field)
+ private static boolean isFieldPrimitive(Schema.Field field)
{
return isPrimitive(field.schema()) ||
isPrimitiveArray(field.schema()) ||
@@ -87,12 +85,13 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
}
- private final boolean fromPigAvroStorage;
private final boolean binaryAsString;
- public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
+ /**
+ * @param binaryAsString boolean to encode the byte[] as a string.
+ */
+ public AvroFlattenerMaker(final boolean binaryAsString)
{
- this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
}
@@ -128,21 +127,16 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
private Object transformValue(final Object field)
{
- if (fromPigAvroStorage && field instanceof GenericData.Array) {
- return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
- }
if (field instanceof ByteBuffer) {
if (binaryAsString) {
return StringUtils.fromUtf8(((ByteBuffer) field).array());
} else {
return ((ByteBuffer) field).array();
}
- }
- if (field instanceof Utf8) {
+ } else if (field instanceof Utf8) {
return field.toString();
- }
- if (field instanceof List) {
- return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
+ } else if (field instanceof List) {
+ return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
}
return field;
}
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
index 92ea3ae..a8baa42 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java
@@ -38,18 +38,17 @@ public class AvroParsers
public static ObjectFlattener<GenericRecord> makeFlattener(
final ParseSpec parseSpec,
- final boolean fromPigAvroStorage,
final boolean binaryAsString
)
{
final JSONPathSpec flattenSpec;
- if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) {
+ if (parseSpec instanceof AvroParseSpec) {
flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec();
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
- return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
+ return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString));
}
public static List<InputRow> parseGenericRecord(
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
index bfb6e22..42195ca 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java
@@ -25,6 +25,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import javax.annotation.Nullable;
+
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -139,6 +141,7 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
+ @Nullable
@Override
public Object getMapValue(final Object o, final String s)
{
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
index 5b9d386..6ff97c4 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java
@@ -32,6 +32,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.druid.java.util.common.parsers.ParseException;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
{
@@ -83,7 +84,7 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
- return registry != null ? registry.equals(that.registry) : that.registry == null;
+ return Objects.equals(registry, that.registry);
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
index e7de115..326b58a 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java
@@ -38,6 +38,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.Objects;
public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder
{
@@ -107,14 +108,10 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o;
- if (subjectAndIdConverter != null
- ? !subjectAndIdConverter.equals(that.subjectAndIdConverter)
- : that.subjectAndIdConverter != null) {
+ if (!Objects.equals(subjectAndIdConverter, that.subjectAndIdConverter)) {
return false;
}
- return !(schemaRepository != null
- ? !schemaRepository.equals(that.schemaRepository)
- : that.schemaRepository != null);
+ return Objects.equals(schemaRepository, that.schemaRepository);
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
index 0ba5579..e5c2541 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.schemarepo.client.Avro1124RESTRepositoryClient;
+import java.util.Objects;
+
public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient
{
private final String url;
@@ -60,7 +62,7 @@ public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryC
Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o;
- return !(url != null ? !url.equals(that.url) : that.url != null);
+ return Objects.equals(url, that.url);
}
@Override
diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
index 82f89f6..d2685f5 100644
--- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
+++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java
@@ -27,6 +27,7 @@ import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
import java.nio.ByteBuffer;
+import java.util.Objects;
/**
* This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro
@@ -88,7 +89,7 @@ public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter<Stri
Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o;
- return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null);
+ return Objects.equals(topic, converter.topic);
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
index dc25e8a..082f5e7 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.data.input;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
@@ -29,12 +28,7 @@ import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.avro.AvroExtensionsModule;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
import org.junit.Before;
import org.junit.Test;
@@ -59,102 +53,59 @@ public class AvroHadoopInputRowParserTest
}
@Test
- public void testParseNotFromPigAvroStorage() throws IOException
+ public void testParseNotFromSpark() throws IOException
{
- testParse(buildSomeAvroDatum(), false);
+ testParse(buildSomeAvroDatum());
}
@Test
- public void testParseFromPiggyBankAvroStorage() throws IOException
+ public void testParseFromSpark() throws IOException
{
- testParse(buildPiggyBankAvro(), false);
+ testParse(buildAvroFromFile());
}
- @Test
- public void testParseFromPigAvroStorage() throws IOException
+ private void testParse(GenericRecord record) throws IOException
{
- testParse(buildPigAvro(), true);
- }
-
- private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
- {
- AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage);
+ AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
InputRow inputRow = parser2.parseBatch(record).get(0);
- assertInputRowCorrect(inputRow, DIMENSIONS, fromPigAvroStorage);
- }
-
-
- public static GenericRecord buildPigAvro() throws IOException
- {
- return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage");
+ assertInputRowCorrect(inputRow, DIMENSIONS);
}
- public static GenericRecord buildPiggyBankAvro() throws IOException
+ private static GenericRecord buildAvroFromFile() throws IOException
{
- return buildPigAvro(
- buildSomeAvroDatum(),
- "org.apache.pig.piggybank.storage.avro.AvroStorage",
- "org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')"
+ return buildAvroFromFile(
+ buildSomeAvroDatum()
);
}
- private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage)
+ private static GenericRecord buildAvroFromFile(GenericRecord datum)
throws IOException
{
final File tmpDir = Files.createTempDir();
- FileReader<GenericRecord> reader = null;
- PigServer pigServer = null;
- try {
- // 0. write avro object into temp file.
- File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
- DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
- new SpecificDatumWriter<>()
- );
+
+ // 0. write avro object into temp file.
+ File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
+ try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
+ new SpecificDatumWriter<>()
+ )) {
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum);
- dataFileWriter.close();
-
- // 1. read avro files into Pig
- pigServer = new PigServer(ExecType.LOCAL);
- pigServer.registerQuery(
- StringUtils.format(
- "A = LOAD '%s' USING %s;",
- someAvroDatumFile,
- inputStorage
- )
- );
-
- // 2. write new avro file using AvroStorage
- File outputDir = new File(tmpDir, "output");
- ExecJob job = pigServer.store("A", String.valueOf(outputDir), outputStorage);
-
- while (!job.hasCompleted()) {
- Thread.sleep(100);
- }
-
- assert (job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
-
- // 3. read avro object from AvroStorage
- reader = DataFileReader.openReader(
- new File(outputDir, "part-m-00000.avro"),
- new GenericDatumReader<GenericRecord>()
- );
-
- return reader.next();
}
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- finally {
- if (pigServer != null) {
- pigServer.shutdown();
- }
- Closeables.close(reader, true);
- FileUtils.deleteDirectory(tmpDir);
+
+ final GenericRecord record;
+ // 3. read avro object from AvroStorage
+ try (FileReader<GenericRecord> reader = DataFileReader.openReader(
+ someAvroDatumFile,
+ new GenericDatumReader<>()
+ )) {
+ record = reader.next();
}
+
+ return record;
}
+
}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
index 184c8b2..cadf7a9 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java
@@ -55,6 +55,7 @@ import org.schemarepo.api.converter.AvroSchemaConverter;
import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -71,20 +72,20 @@ import static org.junit.Assert.assertEquals;
public class AvroStreamInputRowParserTest
{
- public static final String EVENT_TYPE = "eventType";
- public static final String ID = "id";
- public static final String SOME_OTHER_ID = "someOtherId";
- public static final String IS_VALID = "isValid";
- public static final String TOPIC = "aTopic";
- public static final String EVENT_TYPE_VALUE = "type-a";
- public static final long ID_VALUE = 1976491L;
- public static final long SOME_OTHER_ID_VALUE = 6568719896L;
- public static final float SOME_FLOAT_VALUE = 0.23555f;
- public static final int SOME_INT_VALUE = 1;
- public static final long SOME_LONG_VALUE = 679865987569912369L;
- public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC());
- public static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
- public static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
+ private static final String EVENT_TYPE = "eventType";
+ private static final String ID = "id";
+ private static final String SOME_OTHER_ID = "someOtherId";
+ private static final String IS_VALID = "isValid";
+ private static final String TOPIC = "aTopic";
+ private static final String EVENT_TYPE_VALUE = "type-a";
+ private static final long ID_VALUE = 1976491L;
+ private static final long SOME_OTHER_ID_VALUE = 6568719896L;
+ private static final float SOME_FLOAT_VALUE = 0.23555f;
+ private static final int SOME_INT_VALUE = 1;
+ private static final long SOME_LONG_VALUE = 679865987569912369L;
+ private static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC());
+ static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
+ private static final List<String> DIMENSIONS_SCHEMALESS = Arrays.asList(
"nested",
SOME_OTHER_ID,
"someStringArray",
@@ -98,7 +99,7 @@ public class AvroStreamInputRowParserTest
"someInt",
"timestamp"
);
- public static final AvroParseSpec PARSE_SPEC = new AvroParseSpec(
+ static final AvroParseSpec PARSE_SPEC = new AvroParseSpec(
new TimestampSpec("nested", "millis", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null),
new JSONPathSpec(
@@ -108,7 +109,7 @@ public class AvroStreamInputRowParserTest
)
)
);
- public static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec(
+ private static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec(
new TimestampSpec("nested", "millis", null),
new DimensionsSpec(null, null, null),
new JSONPathSpec(
@@ -118,19 +119,19 @@ public class AvroStreamInputRowParserTest
)
)
);
- public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
+ private static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
private static final long SUB_LONG_VALUE = 1543698L;
private static final int SUB_INT_VALUE = 4892;
- public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
+ private static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
.setSubInt(SUB_INT_VALUE)
.setSubLong(SUB_LONG_VALUE)
.build();
- public static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1");
- public static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
- public static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
- new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
+ private static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1");
+ private static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
+ private static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
+ new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
{
- @Nullable
+ @Nonnull
@Override
public Integer apply(@Nullable CharSequence input)
{
@@ -138,10 +139,10 @@ public class AvroStreamInputRowParserTest
}
}
);
- public static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
- new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
+ private static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
+ new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
{
- @Nullable
+ @Nonnull
@Override
public CharSequence apply(@Nullable CharSequence input)
{
@@ -149,8 +150,8 @@ public class AvroStreamInputRowParserTest
}
}
);
- public static final String SOME_UNION_VALUE = "string as union";
- public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
+ private static final String SOME_UNION_VALUE = "string as union";
+ private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
@@ -173,7 +174,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
- new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -190,7 +191,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
- new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -221,7 +222,7 @@ public class AvroStreamInputRowParserTest
InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
- assertInputRowCorrect(inputRow, DIMENSIONS, false);
+ assertInputRowCorrect(inputRow, DIMENSIONS);
}
@Test
@@ -231,7 +232,7 @@ public class AvroStreamInputRowParserTest
Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC_SCHEMALESS,
- new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
+ new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
@@ -244,7 +245,7 @@ public class AvroStreamInputRowParserTest
// encode schema id
Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
- TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<Integer, Schema, String>(
+ TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<>(
repository,
new IntegerConverter(),
new AvroSchemaConverter(),
@@ -253,19 +254,20 @@ public class AvroStreamInputRowParserTest
Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
converter.putSubjectAndId(id, byteBuffer);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- out.write(byteBuffer.array());
- // encode data
- DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
- // write avro datum to bytes
- writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ out.write(byteBuffer.array());
+ // encode data
+ DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(someAvroDatum.getSchema());
+ // write avro datum to bytes
+ writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
- InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
+ InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0);
- assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false);
+ assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS);
+ }
}
- public static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions, boolean isFromPigAvro)
+ static void assertInputRowCorrect(InputRow inputRow, List<String> expectedDimensions)
{
assertEquals(expectedDimensions, inputRow.getDimensions());
assertEquals(1543698L, inputRow.getTimestampFromEpoch());
@@ -316,9 +318,7 @@ public class AvroStreamInputRowParserTest
);
assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
- if (isFromPigAvro) {
- assertEquals(String.valueOf(SOME_FIXED_VALUE), Arrays.toString((byte[]) inputRow.getRaw("someFixed")));
- }
+ assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
assertEquals(
Arrays.toString(SOME_BYTES_VALUE.array()),
Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
new file mode 100644
index 0000000..675a9ce
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import org.apache.druid.data.input.AvroStreamInputRowParserTest;
+import org.apache.druid.data.input.SomeAvroDatum;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroFlattenerMakerTest
+{
+
+ @Test
+ public void getRootField()
+ {
+ final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+ Assert.assertEquals(
+ record.timestamp,
+ flattener.getRootField(record, "timestamp")
+ );
+ Assert.assertEquals(
+ record.eventType,
+ flattener.getRootField(record, "eventType")
+ );
+ Assert.assertEquals(
+ record.id,
+ flattener.getRootField(record, "id")
+ );
+ Assert.assertEquals(
+ record.someOtherId,
+ flattener.getRootField(record, "someOtherId")
+ );
+ Assert.assertEquals(
+ record.isValid,
+ flattener.getRootField(record, "isValid")
+ );
+ Assert.assertEquals(
+ record.someIntArray,
+ flattener.getRootField(record, "someIntArray")
+ );
+ Assert.assertEquals(
+ record.someStringArray,
+ flattener.getRootField(record, "someStringArray")
+ );
+ Assert.assertEquals(
+ record.someIntValueMap,
+ flattener.getRootField(record, "someIntValueMap")
+ );
+ Assert.assertEquals(
+ record.someStringValueMap,
+ flattener.getRootField(record, "someStringValueMap")
+ );
+ Assert.assertEquals(
+ record.someUnion,
+ flattener.getRootField(record, "someUnion")
+ );
+ Assert.assertEquals(
+ record.someNull,
+ flattener.getRootField(record, "someNull")
+ );
+ Assert.assertEquals(
+ record.someFixed,
+ flattener.getRootField(record, "someFixed")
+ );
+ Assert.assertEquals(
+ // Casted to an array by transformValue
+ record.someBytes.array(),
+ flattener.getRootField(record, "someBytes")
+ );
+ Assert.assertEquals(
+ record.someEnum,
+ flattener.getRootField(record, "someEnum")
+ );
+ Assert.assertEquals(
+ record.someRecord,
+ flattener.getRootField(record, "someRecord")
+ );
+ Assert.assertEquals(
+ record.someLong,
+ flattener.getRootField(record, "someLong")
+ );
+ Assert.assertEquals(
+ record.someInt,
+ flattener.getRootField(record, "someInt")
+ );
+ Assert.assertEquals(
+ record.someFloat,
+ flattener.getRootField(record, "someFloat")
+ );
+ }
+
+ @Test
+ public void makeJsonPathExtractor()
+ {
+ final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+ Assert.assertEquals(
+ record.timestamp,
+ flattener.makeJsonPathExtractor("$.timestamp").apply(record)
+ );
+ Assert.assertEquals(
+ record.eventType,
+ flattener.makeJsonPathExtractor("$.eventType").apply(record)
+ );
+ Assert.assertEquals(
+ record.id,
+ flattener.makeJsonPathExtractor("$.id").apply(record)
+ );
+ Assert.assertEquals(
+ record.someOtherId,
+ flattener.makeJsonPathExtractor("$.someOtherId").apply(record)
+ );
+ Assert.assertEquals(
+ record.isValid,
+ flattener.makeJsonPathExtractor("$.isValid").apply(record)
+ );
+ Assert.assertEquals(
+ record.someIntArray,
+ flattener.makeJsonPathExtractor("$.someIntArray").apply(record)
+ );
+ Assert.assertEquals(
+ record.someStringArray,
+ flattener.makeJsonPathExtractor("$.someStringArray").apply(record)
+ );
+ Assert.assertEquals(
+ record.someIntValueMap,
+ flattener.makeJsonPathExtractor("$.someIntValueMap").apply(record)
+ );
+ Assert.assertEquals(
+ record.someStringValueMap,
+ flattener.makeJsonPathExtractor("$.someStringValueMap").apply(record)
+ );
+ Assert.assertEquals(
+ record.someUnion,
+ flattener.makeJsonPathExtractor("$.someUnion").apply(record)
+ );
+ Assert.assertEquals(
+ record.someNull,
+ flattener.makeJsonPathExtractor("$.someNull").apply(record)
+ );
+ Assert.assertEquals(
+ record.someFixed,
+ flattener.makeJsonPathExtractor("$.someFixed").apply(record)
+ );
+ Assert.assertEquals(
+ // Casted to an array by transformValue
+ record.someBytes.array(),
+ flattener.makeJsonPathExtractor("$.someBytes").apply(record)
+ );
+ Assert.assertEquals(
+ record.someEnum,
+ flattener.makeJsonPathExtractor("$.someEnum").apply(record)
+ );
+ Assert.assertEquals(
+ record.someRecord,
+ flattener.makeJsonPathExtractor("$.someRecord").apply(record)
+ );
+ Assert.assertEquals(
+ record.someLong,
+ flattener.makeJsonPathExtractor("$.someLong").apply(record)
+ );
+ Assert.assertEquals(
+ record.someInt,
+ flattener.makeJsonPathExtractor("$.someInt").apply(record)
+ );
+ Assert.assertEquals(
+ record.someFloat,
+ flattener.makeJsonPathExtractor("$.someFloat").apply(record)
+ );
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void makeJsonQueryExtractor()
+ {
+ final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false);
+
+ Assert.assertEquals(
+ record.timestamp,
+ flattener.makeJsonQueryExtractor("$.timestamp").apply(record)
+ );
+ }
+}
diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
index 911819d..5481c0e 100644
--- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
+++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java
@@ -45,8 +45,7 @@ import static org.mockito.Mockito.when;
*/
public class SchemaRegistryBasedAvroBytesDecoderTest
{
-
- SchemaRegistryClient registry;
+ private SchemaRegistryClient registry;
@Before
public void setUp()
@@ -96,7 +95,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
- byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
+ private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
index 1658d17..3c63eb2 100755
--- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
+++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java
@@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.joda.time.DateTime;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -74,7 +75,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
- new AvroFlattenerMaker(false, this.binaryAsString)
+ new AvroFlattenerMaker(this.binaryAsString)
);
}
@@ -92,6 +93,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
/**
* imitate avro extension {@link org.apache.druid.data.input.avro.AvroParsers#parseGenericRecord}
*/
+ @Nonnull
@Override
public List<InputRow> parseBatch(GenericRecord record)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org