You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by sh...@apache.org on 2020/01/15 22:25:31 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1015] Adding
support for direct Avro and Protobuf writes in Parquet format
This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f44ad4f [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format
f44ad4f is described below
commit f44ad4f2b19f9fcd00518a748748bd5b72c34df3
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Wed Jan 15 14:25:12 2020 -0800
[GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format
Closes #2860 from shirshanka/protoavroparquet
---
defaultEnvironment.gradle | 3 +
gobblin-core-base/build.gradle | 4 +-
.../apache/gobblin/test/AnyToJsonConverter.java | 4 +-
.../apache/gobblin/test/AnyToStringConverter.java | 4 +-
.../apache/gobblin/test/SequentialTestSource.java | 94 ++++++++--
gobblin-docs/sinks/ParquetHdfsDataWriter.md | 6 +-
.../src/main/resources/example-parquet.pull | 34 ++++
.../gobblin-parquet-apache/build.gradle | 2 +
.../gobblin/writer/ParquetDataWriterBuilder.java | 144 +++++++--------
.../gobblin/writer/ParquetHdfsDataWriter.java | 70 --------
.../gobblin/writer/ParquetHdfsDataWriterTest.java | 194 ++++++++++++---------
.../org/apache/gobblin/writer/TestConstants.java | 51 ++----
.../writer/AbstractParquetDataWriterBuilder.java | 73 ++++++++
.../parquet}/writer/ParquetHdfsDataWriter.java | 21 +--
.../parquet/writer/ParquetRecordFormat.java | 22 +--
.../parquet/writer/ParquetWriterConfiguration.java | 107 ++++++++++++
.../gobblin/parquet/writer/ParquetWriterShim.java | 20 +--
.../writer/test/ParquetHdfsDataWriterTestBase.java | 145 +++++++++++++++
.../parquet/writer/test/TestConstantsBase.java | 129 ++++++++++++++
gobblin-modules/gobblin-parquet/build.gradle | 2 +
.../gobblin/writer/ParquetDataWriterBuilder.java | 148 ++++++++--------
.../gobblin/writer/ParquetHdfsDataWriterTest.java | 188 +++++++++++---------
.../org/apache/gobblin/writer/TestConstants.java | 54 +++---
gobblin-test-utils/build.gradle | 63 +++++++
.../buildConfig/findbugs-exclude-filter.xml | 6 +
gobblin-test-utils/src/main/avro/TestRecord.avsc | 23 +++
.../java/org/apache/gobblin/test/TestRecord.java | 0
gobblin-test-utils/src/main/proto/TestRecord.proto | 11 ++
gradle/scripts/dependencyDefinitions.gradle | 7 +-
29 files changed, 1121 insertions(+), 508 deletions(-)
diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle
index af64d4e..b5f10a7 100644
--- a/defaultEnvironment.gradle
+++ b/defaultEnvironment.gradle
@@ -28,6 +28,9 @@ subprojects {
maven {
url "http://conjars.org/repo"
}
+ maven {
+ url "https://maven.twttr.com/"
+ }
}
project.buildDir = new File(project.rootProject.buildDir, project.name)
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 934a22e..80b0f67 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -18,11 +18,13 @@
apply plugin: 'java'
apply plugin: 'me.champeau.gradle.jmh'
+
dependencies {
compile project(":gobblin-api")
compile project(":gobblin-utility")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-modules:gobblin-codecs")
+ compile project(":gobblin-test-utils")
compile externalDependency.reactivex
compile externalDependency.avroMapredH2
@@ -34,7 +36,6 @@ dependencies {
compile externalDependency.typesafeConfig
compile externalDependency.findBugsAnnotations
- testCompile project(":gobblin-test-utils")
testCompile externalDependency.testng
testCompile externalDependency.mockito
@@ -51,5 +52,4 @@ jmh {
duplicateClassesStrategy = "EXCLUDE"
}
-
ext.classification="library"
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
index c6acc06..634098b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
@@ -31,12 +31,12 @@ import org.apache.gobblin.util.io.GsonInterfaceAdapter;
/**
* Converts any Object into a Json object
*/
-public class AnyToJsonConverter extends Converter<String, String, Object, JsonElement> {
+public class AnyToJsonConverter extends Converter<Object, String, Object, JsonElement> {
private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
private boolean stripTopLevelType = true; // TODO: Configure
@Override
- public String convertSchema(String inputSchema, WorkUnitState workUnit)
+ public String convertSchema(Object inputSchema, WorkUnitState workUnit)
throws SchemaConversionException {
return "";
}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
index adc801e..3684f2d 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
@@ -26,10 +26,10 @@ import org.apache.gobblin.converter.SingleRecordIterable;
/**
* Converts any Object into a String
*/
-public class AnyToStringConverter extends Converter<String, String, Object, String> {
+public class AnyToStringConverter extends Converter<Object, String, Object, String> {
@Override
- public String convertSchema(String inputSchema, WorkUnitState workUnit)
+ public String convertSchema(Object inputSchema, WorkUnitState workUnit)
throws SchemaConversionException {
return "";
}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
index 12ba1f2..5872808 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
@@ -39,28 +39,38 @@ import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark;
import org.apache.gobblin.source.extractor.Extractor;
-import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.ExtractFactory;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.test.proto.TestRecordProtos;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WatermarkStorage;
-
/**
* A Test source that generates a sequence of records, works in batch and streaming mode.
*/
@Slf4j
-public class SequentialTestSource implements Source<String, Object> {
+public class SequentialTestSource implements Source<Object, Object> {
+
+ private enum InMemoryFormat {
+ POJO,
+ AVRO,
+ PROTOBUF
+ }
+
+
private static final int DEFAULT_NUM_PARALLELISM = 1;
private static final String DEFAULT_NAMESPACE = "TestDB";
private static final String DEFAULT_TABLE = "TestTable";
private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100;
public static final String WORK_UNIT_INDEX = "workUnitIndex";
private static final Long DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS = 10L;
+ public static final String MEMORY_FORMAT_KEY = "inMemoryFormat";
+ public static final String DEFAULT_IN_MEMORY_FORMAT = InMemoryFormat.POJO.toString();
private final AtomicBoolean configured = new AtomicBoolean(false);
@@ -69,6 +79,7 @@ public class SequentialTestSource implements Source<String, Object> {
private String table;
private int numRecordsPerExtract;
private long sleepTimePerRecord;
+ private InMemoryFormat inMemFormat;
private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY;
private final ExtractFactory _extractFactory = new ExtractFactory("yyyyMMddHHmmss");
private boolean streaming = false;
@@ -86,6 +97,12 @@ public class SequentialTestSource implements Source<String, Object> {
if (streaming) {
numRecordsPerExtract = Integer.MAX_VALUE;
}
+ inMemFormat = InMemoryFormat.valueOf(ConfigUtils.getString(config, "source." + MEMORY_FORMAT_KEY,
+ DEFAULT_IN_MEMORY_FORMAT));
+ log.info("Source configured with: num_parallelism: {}, namespace: {}, "
+ + "table: {}, numRecordsPerExtract: {}, sleepTimePerRecord: {}, streaming: {}, inMemFormat: {}",
+ this.num_parallelism, this.namespace,
+ this.table, this.numRecordsPerExtract, this.sleepTimePerRecord, this.streaming, this.inMemFormat);
configured.set(true);
}
}
@@ -110,6 +127,7 @@ public class SequentialTestSource implements Source<String, Object> {
workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval);
log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
+ workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
}
else
{
@@ -120,6 +138,7 @@ public class SequentialTestSource implements Source<String, Object> {
workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval);
log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
+ workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
}
newWorkUnits.add(workUnit);
}
@@ -140,6 +159,7 @@ public class SequentialTestSource implements Source<String, Object> {
LongWatermark expectedHighWatermark = new LongWatermark((i + 1) * numRecordsPerExtract);
workUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedHighWatermark));
workUnit.setProp(WORK_UNIT_INDEX, i);
+ workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
workUnits.add(workUnit);
}
return workUnits;
@@ -150,12 +170,14 @@ public class SequentialTestSource implements Source<String, Object> {
}
- static class TestBatchExtractor implements Extractor<String, Object> {
+ static class TestBatchExtractor implements Extractor<Object, Object> {
private long recordsExtracted = 0;
private final long numRecordsPerExtract;
private LongWatermark currentWatermark;
private long sleepTimePerRecord;
private int partition;
+ private final InMemoryFormat inMemoryFormat;
+ private final Object schema;
WorkUnitState workUnitState;
@@ -169,16 +191,34 @@ public class SequentialTestSource implements Source<String, Object> {
this.numRecordsPerExtract = numRecordsPerExtract;
this.sleepTimePerRecord = sleepTimePerRecord;
this.workUnitState = wuState;
+ this.inMemoryFormat = InMemoryFormat.valueOf(this.workUnitState.getProp(MEMORY_FORMAT_KEY));
+ this.schema = getSchema(inMemoryFormat);
}
@Override
- public String getSchema()
+ public Object getSchema()
throws IOException {
- return "";
+ return this.schema;
+ }
+
+ private Object getSchema(InMemoryFormat inMemoryFormat) {
+ switch (inMemoryFormat) {
+ case POJO: {
+ return TestRecord.class;
+ }
+ case AVRO: {
+ return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+ }
+ case PROTOBUF: {
+ return TestRecordProtos.TestRecord.class;
+ }
+ default:
+ throw new RuntimeException("Not implemented " + inMemoryFormat.name());
+ }
}
@Override
- public Object readRecord(@Deprecated Object reuse)
+ public RecordEnvelope readRecordEnvelope()
throws DataRecordException, IOException {
if (recordsExtracted < numRecordsPerExtract) {
try {
@@ -186,11 +226,37 @@ public class SequentialTestSource implements Source<String, Object> {
} catch (InterruptedException e) {
Throwables.propagate(e);
}
- TestRecord record = new TestRecord(this.partition, this.currentWatermark.getValue(), null);
+ Object record;
+ switch (this.inMemoryFormat) {
+ case POJO: {
+ record = new TestRecord(this.partition, this.currentWatermark.getValue(), "I am a POJO message");
+ break;
+ }
+ case AVRO: {
+ record = org.apache.gobblin.test.avro.TestRecord.newBuilder()
+ .setPartition(this.partition)
+ .setSequence(this.currentWatermark.getValue())
+ .setPayload("I am an Avro message")
+ .build();
+ break;
+ }
+ case PROTOBUF: {
+ record = TestRecordProtos.TestRecord.newBuilder()
+ .setPartition(this.partition)
+ .setSequence(this.currentWatermark.getValue())
+ .setPayload("I am a Protobuf message")
+ .build();
+ break;
+ }
+ default: throw new RuntimeException("");
+ }
log.debug("Extracted record -> {}", record);
+ RecordEnvelope re = new RecordEnvelope<>(record,
+ new DefaultCheckpointableWatermark(String.valueOf(this.partition),
+ new LongWatermark(this.currentWatermark.getValue())));
currentWatermark.increment();
recordsExtracted++;
- return record;
+ return re;
} else {
return null;
}
@@ -218,7 +284,7 @@ public class SequentialTestSource implements Source<String, Object> {
}
- static class TestStreamingExtractor implements StreamingExtractor<String, Object> {
+ static class TestStreamingExtractor implements StreamingExtractor<Object, Object> {
private Optional<WatermarkStorage> watermarkStorage;
private final TestBatchExtractor extractor;
@@ -233,7 +299,7 @@ public class SequentialTestSource implements Source<String, Object> {
}
@Override
- public String getSchema()
+ public Object getSchema()
throws IOException {
return extractor.getSchema();
}
@@ -241,9 +307,7 @@ public class SequentialTestSource implements Source<String, Object> {
@Override
public RecordEnvelope<Object> readRecordEnvelope()
throws DataRecordException, IOException {
- TestRecord record = (TestRecord) extractor.readRecord(null);
- return new RecordEnvelope<>((Object) record, new DefaultCheckpointableWatermark(""+record.getPartition(),
- new LongWatermark(record.getSequence())));
+ return extractor.readRecordEnvelope();
}
@Override
@@ -288,7 +352,7 @@ public class SequentialTestSource implements Source<String, Object> {
@Override
- public Extractor<String, Object> getExtractor(WorkUnitState state)
+ public Extractor<Object, Object> getExtractor(WorkUnitState state)
throws IOException {
Config config = ConfigFactory.parseProperties(state.getProperties());
configureIfNeeded(config);
diff --git a/gobblin-docs/sinks/ParquetHdfsDataWriter.md b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
index a7a24ea..8485cbf 100644
--- a/gobblin-docs/sinks/ParquetHdfsDataWriter.md
+++ b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
@@ -1,6 +1,6 @@
# Description
-An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of [`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
+An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of either Avro, Protobuf or [`ParquetGroup`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.typ [...]
# Usage
```
@@ -9,6 +9,9 @@ writer.destination.type=HDFS
writer.output.format=PARQUET
```
+# Example Pipeline Configuration
+* [`example-parquet.pull`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-example/src/main/resources/example-parquet.pull) contains an example of generating test data and writing to Parquet files.
+
# Configuration
@@ -19,6 +22,7 @@ writer.output.format=PARQUET
| writer.parquet.dictionary | To turn dictionary encoding on. Parquet has a dictionary encoding for data with a small number of unique values ( < 10^5 ) that aids in significant compression and boosts processing speed. | true | No |
| writer.parquet.validate | To turn on validation using the schema. This validation is done by [`ParquetWriter`](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java) not by Gobblin. | false | No |
| writer.parquet.version | Version of parquet writer to use. Available versions are v1 and v2. | v1 | No |
+| writer.parquet.format | In-memory format of the record being written to Parquet. [`Options`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java) are AVRO, PROTOBUF and GROUP | GROUP | No |
# Developer Notes
diff --git a/gobblin-example/src/main/resources/example-parquet.pull b/gobblin-example/src/main/resources/example-parquet.pull
new file mode 100644
index 0000000..e61caf6
--- /dev/null
+++ b/gobblin-example/src/main/resources/example-parquet.pull
@@ -0,0 +1,34 @@
+job.name=ExampleParquetWriter
+job.group=Parquet
+job.description=This is a job that generates test data and writes to Parquet
+job.lock.enabled=false
+task.execution.synchronousExecutionModel=false
+
+source.class=org.apache.gobblin.test.SequentialTestSource
+source.numParallelism=2
+source.inMemoryFormat=AVRO
+#source.inMemoryFormat = POJO
+#source.inMemoryFormat = PROTOBUF
+
+fs.uri=file:///
+#work.dir=SET_TO_WORK_DIRECTORY
+
+#converter.classes = CONVERTER_CLASSES_IF_ANY
+
+extract.table.name=TestData
+extract.namespace=org.apache.gobblin.example
+extract.table.type=APPEND_ONLY
+
+state.store.enabled=true
+state.store.fs.uri=${fs.uri}
+#state.store.dir=${work.dir}/store
+
+
+writer.destination.type=HDFS
+writer.output.format=PARQUET
+writer.parquet.format=AVRO
+#writer.parquet.format=PROTOBUF // use this for Protobuf data
+writer.fs.uri=${fs.uri}
+writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder
+data.publisher.fs.uri=${fs.uri}
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
diff --git a/gobblin-modules/gobblin-parquet-apache/build.gradle b/gobblin-modules/gobblin-parquet-apache/build.gradle
index 560638d..5cf4618 100644
--- a/gobblin-modules/gobblin-parquet-apache/build.gradle
+++ b/gobblin-modules/gobblin-parquet-apache/build.gradle
@@ -23,6 +23,8 @@ dependencies {
compile externalDependency.gson
compile externalDependency.parquet
+ compile externalDependency.parquetAvro
+ compile externalDependency.parquetProto
testCompile externalDependency.testng
testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index ac4e2d4..2060037 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -17,96 +17,102 @@
package org.apache.gobblin.writer;
import java.io.IOException;
-import java.util.Optional;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.schema.MessageType;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.protobuf.Message;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
+import lombok.extern.slf4j.Slf4j;
-import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.parquet.writer.ParquetWriterShim;
-public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
- public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
- public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
- public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
- public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
- public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
- public static final String DEFAULT_PARQUET_WRITER = "v1";
-
- @Override
- public DataWriter<Group> build()
- throws IOException {
- Preconditions.checkNotNull(this.destination);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
- Preconditions.checkNotNull(this.schema);
- Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
-
- switch (this.destination.getType()) {
- case HDFS:
- return new ParquetHdfsDataWriter(this, this.destination.getProperties());
- default:
- throw new RuntimeException("Unknown destination type: " + this.destination.getType());
- }
- }
+@Slf4j
+public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {
/**
- * Build a {@link ParquetWriter<Group>} for given file path with a block size.
- * @param blockSize
- * @param stagingFile
+ * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
+ * @param writerConfiguration
* @return
* @throws IOException
*/
- public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+ @Override
+ public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
throws IOException {
- State state = this.destination.getProperties();
- int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
- int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
- boolean enableDictionary =
- state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
- boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
- String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
- Path absoluteStagingFile = new Path(rootURI, stagingFile);
- CompressionCodecName codec = getCodecFromConfig();
- GroupWriteSupport support = new GroupWriteSupport();
- Configuration conf = new Configuration();
- GroupWriteSupport.setSchema(this.schema, conf);
- ParquetProperties.WriterVersion writerVersion = getWriterVersion();
- return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
- validate, writerVersion, conf);
- }
- private ParquetProperties.WriterVersion getWriterVersion() {
- return ParquetProperties.WriterVersion.fromString(
- this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
- }
+ CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
+ ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
+ .fromString(writerConfiguration.getWriterVersion());
- private CompressionCodecName getCodecFromConfig() {
- State state = this.destination.getProperties();
- String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
- .orElse(CompressionCodecName.SNAPPY.toString());
- return CompressionCodecName.valueOf(codecValue.toUpperCase());
- }
+ Configuration conf = new Configuration();
+ ParquetWriter versionSpecificWriter = null;
+ switch (writerConfiguration.getRecordFormat()) {
+ case GROUP: {
+ GroupWriteSupport.setSchema((MessageType) this.schema, conf);
+ WriteSupport support = new GroupWriteSupport();
+ versionSpecificWriter = new ParquetWriter<Group>(
+ writerConfiguration.getAbsoluteStagingFile(),
+ support,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.getDictPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ writerConfiguration.isValidate(),
+ writerVersion,
+ conf);
+ break;
+ }
+ case AVRO: {
+ versionSpecificWriter = new AvroParquetWriter(
+ writerConfiguration.getAbsoluteStagingFile(),
+ (Schema) this.schema,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ conf);
+ break;
+ }
+ case PROTOBUF: {
+ versionSpecificWriter = new ProtoParquetWriter(
+ writerConfiguration.getAbsoluteStagingFile(),
+ (Class<? extends Message>) this.schema,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ writerConfiguration.isValidate());
+ break;
+ }
+ default: throw new RuntimeException("Record format not supported");
+ }
+ ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;
+
+ return new ParquetWriterShim() {
+ @Override
+ public void write(Object record)
+ throws IOException {
+ finalVersionSpecificWriter.write(record);
+ }
- private String getProperty(String key) {
- return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+ @Override
+ public void close()
+ throws IOException {
+ finalVersionSpecificWriter.close();
+ }
+ };
}
}
diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
deleted file mode 100644
index 8a2fc9e..0000000
--- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.hadoop.ParquetWriter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-
-/**
- * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
- *
- * <p>
- * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
- * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
- * </p>
- *
- * @author tilakpatidar
- */
-public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
- private final ParquetWriter<Group> writer;
- protected final AtomicLong count = new AtomicLong(0);
-
- public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
- throws IOException {
- super(builder, state);
- this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
- }
-
- @Override
- public void write(Group record)
- throws IOException {
- this.writer.write(record);
- this.count.incrementAndGet();
- }
-
- @Override
- public long recordsWritten() {
- return this.count.get();
- }
-
- @Override
- public void close()
- throws IOException {
- try {
- this.writer.close();
- } finally {
- super.close();
- }
- }
-}
diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 46b41d7..fe3a51a 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -21,91 +21,117 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
+import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.proto.ProtoParquetReader;
import org.apache.parquet.schema.MessageType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
+import lombok.extern.slf4j.Slf4j;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
@Test(groups = {"gobblin.writer"})
-public class ParquetHdfsDataWriterTest {
+@Slf4j
+public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase {
- private MessageType schema;
- private String filePath;
- private ParquetHdfsDataWriter writer;
- private State properties;
+
+ public ParquetHdfsDataWriterTest() {
+ super(new TestConstants());
+ }
+
+ @Override
+ protected DataWriterBuilder getDataWriterBuilder() {
+ return new ParquetDataWriterBuilder();
+ }
@BeforeMethod
public void setUp()
throws Exception {
- // Making the staging and/or output dirs if necessary
- File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
- File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
- if (!stagingDir.exists()) {
- boolean mkdirs = stagingDir.mkdirs();
- assert mkdirs;
- }
- if (!outputDir.exists()) {
- boolean mkdirs = outputDir.mkdirs();
- assert mkdirs;
- }
- this.schema = TestConstants.PARQUET_SCHEMA;
- this.filePath = getFilePath();
- this.properties = createStateWithConfig();
- this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+ super.setUp();
}
- private String getFilePath() {
- return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
- + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+ @Override
+ protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format)
+ throws IOException {
+ switch (format) {
+ case GROUP: {
+ return readParquetFilesGroup(outputFile);
+ }
+ case PROTOBUF: {
+ return readParquetFilesProto(outputFile);
+ }
+ case AVRO: {
+ return readParquetFilesAvro(outputFile);
+ }
+ default: throw new RuntimeException(format + " is not supported");
+ }
}
- private State createStateWithConfig() {
- State properties = new State();
- properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
- properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
- properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
- properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
- properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
- properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
- properties.setProp(WRITER_PARQUET_DICTIONARY, true);
- properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
- properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
- properties.setProp(WRITER_PARQUET_VALIDATE, true);
- properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
- return properties;
+ private List<TestRecord> readParquetFilesAvro(File outputFile)
+ throws IOException {
+ ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null;
+ List<TestRecord> records = new ArrayList<>();
+ try {
+ reader = new AvroParquetReader<>(new Path(outputFile.toString()));
+ for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) {
+ records.add(new TestRecord(value.getPartition(),
+ value.getSequence(),
+ value.getPayload()));
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+ }
+ return records;
+
}
- private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
- ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
- writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
- writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
- writerBuilder.schema = this.schema;
- writerBuilder.format = WriterOutputFormat.PARQUET;
- return writerBuilder;
+ protected List<TestRecord> readParquetFilesProto(File outputFile)
+ throws IOException {
+ ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null;
+ List<TestRecord> records = new ArrayList<>();
+ try {
+ reader = new ProtoParquetReader<>(new Path(outputFile.toString()));
+ for (TestRecordProtos.TestRecordOrBuilder value = reader.read(); value != null; value = reader.read()) {
+ records.add(new TestRecord(value.getPartition(),
+ value.getSequence(),
+ value.getPayload()));
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+ }
+ return records;
}
- private List<Group> readParquetFiles(File outputFile)
+
+ protected List<TestRecord> readParquetFilesGroup(File outputFile)
throws IOException {
ParquetReader<Group> reader = null;
List<Group> records = new ArrayList<>();
@@ -123,46 +149,40 @@ public class ParquetHdfsDataWriterTest {
}
}
}
- return records;
+ return records.stream().map(value -> new TestRecord(
+ value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0),
+ value.getLong(TestConstants.SEQUENCE_FIELD_NAME, 0),
+ value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0)
+ )).collect(Collectors.toList());
}
@Test
public void testWrite()
throws Exception {
- long firstWrite;
- long secondWrite;
- List<Group> records;
- Group record1 = TestConstants.PARQUET_RECORD_1;
- Group record2 = TestConstants.PARQUET_RECORD_2;
- String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
- File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
-
- this.writer.write(record1);
- firstWrite = this.writer.recordsWritten();
- this.writer.write(record2);
- secondWrite = this.writer.recordsWritten();
- this.writer.close();
- this.writer.commit();
- records = readParquetFiles(outputFile);
- Group resultRecord1 = records.get(0);
- Group resultRecord2 = records.get(1);
-
- Assert.assertEquals(firstWrite, 1);
- Assert.assertEquals(secondWrite, 2);
- Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
- Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
- Assert.assertEquals(resultRecord2.getString("name", 0), "other");
- Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+ super.testWrite();
+ }
+
+ @Override
+ protected Object getSchema(ParquetRecordFormat format) {
+ switch (format) {
+ case GROUP: {
+ return TestConstants.PARQUET_SCHEMA;
+ }
+ case PROTOBUF: {
+ return TestRecordProtos.TestRecord.class;
+ }
+ case AVRO: {
+ return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+ }
+ default:
+ throw new RuntimeException(format.name() + " is not implemented");
+ }
}
@AfterClass
public void tearDown()
throws IOException {
- // Clean up the staging and/or output directories if necessary
- File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
- if (testRootDir.exists()) {
- FileUtil.fullyDelete(testRootDir);
- }
+ super.tearDown();
}
class SimpleReadSupport extends ReadSupport<Group> {
diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 41d7ee3..9b7f7f6 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -23,40 +23,27 @@ import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
+import org.apache.gobblin.parquet.writer.test.TestConstantsBase;
+import org.apache.gobblin.test.TestRecord;
-public class TestConstants {
- public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
- .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
- Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
-
- public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
-
- public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
-
- public static final String PARQUET_TEST_FILENAME = "test.parquet";
-
- public static final String TEST_FS_URI = "file:///";
-
- public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
-
- public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
- public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+public class TestConstants extends TestConstantsBase<Group> {
- public static final String TEST_WRITER_ID = "writer-1";
-
- public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
-
- public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
-
- public static final String TEST_EXTRACT_TABLE = "TestTable";
-
- public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
-
- static {
- PARQUET_RECORD_1.add("name", "tilak");
- PARQUET_RECORD_1.add("age", 22);
- PARQUET_RECORD_2.add("name", "other");
- PARQUET_RECORD_2.add("age", 22);
+ public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+ .addFields(
+ Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+ .named(TestConstants.PAYLOAD_FIELD_NAME),
+ Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME),
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64).named(TestConstants.SEQUENCE_FIELD_NAME))
+ .named("Data");
+
+ @Override
+ public Group convertToParquetGroup(TestRecord record) {
+ Group group = new SimpleGroup(PARQUET_SCHEMA);
+ group.add(PAYLOAD_FIELD_NAME, record.getPayload());
+ group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence()));
+ group.add(PARTITION_FIELD_NAME, record.getPartition());
+ return group;
}
+
}
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java
new file mode 100644
index 0000000..0ed3965
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.FsDataWriterBuilder;
+import org.apache.gobblin.writer.WriterOutputFormat;
+
+
+@Slf4j
+public abstract class AbstractParquetDataWriterBuilder<S,D> extends FsDataWriterBuilder<S, D> {
+
+ @Override
+ public DataWriter<D> build()
+ throws IOException {
+ Preconditions.checkNotNull(this.destination);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+ Preconditions.checkNotNull(this.schema);
+ Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
+
+ switch (this.destination.getType()) {
+ case HDFS:
+ return new ParquetHdfsDataWriter<D>(this, this.destination.getProperties());
+ default:
+ throw new RuntimeException("Unknown destination type: " + this.destination.getType());
+ }
+ }
+
+ protected abstract ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
+ throws IOException;
+
+ /**
+ * Build a {@link ParquetWriterShim <D>} for given file path with a block size.
+ * @param blockSize
+ * @param stagingFile
+ * @return
+ * @throws IOException
+ */
+ public ParquetWriterShim<D> getWriter(int blockSize, Path stagingFile)
+ throws IOException {
+ State state = this.destination.getProperties();
+ ParquetWriterConfiguration writerConfiguration =
+ new ParquetWriterConfiguration(state, this.getBranches(), this.getBranch(), stagingFile, blockSize);
+
+ log.info("Parquet writer configured with {}", writerConfiguration);
+ return getVersionSpecificWriter(writerConfiguration);
+ }
+
+}
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
similarity index 70%
rename from gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
rename to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
index 744c784..47d5624 100644
--- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
@@ -14,40 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.writer;
+package org.apache.gobblin.parquet.writer;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.writer.FsDataWriter;
/**
- * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
+ * An extension to {@link FsDataWriter} that writes in Parquet formats.
*
* <p>
- * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
- * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
+ * This implementation allows users to specify different formats and codecs
+ * through {@link ParquetWriterConfiguration} to write data.
* </p>
*
* @author tilakpatidar
*/
-public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
- private final ParquetWriter<Group> writer;
+public class ParquetHdfsDataWriter<D> extends FsDataWriter<D> {
+ private final ParquetWriterShim writer;
protected final AtomicLong count = new AtomicLong(0);
- public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
+ public ParquetHdfsDataWriter(AbstractParquetDataWriterBuilder builder, State state)
throws IOException {
super(builder, state);
this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
}
@Override
- public void write(Group record)
+ public void write(D record)
throws IOException {
this.writer.write(record);
this.count.incrementAndGet();
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
similarity index 75%
copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
index eb07977..4403f03 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
@@ -14,21 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.test;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
-
+package org.apache.gobblin.parquet.writer;
/**
- * A Test record
+ * Enum to hold the supported values for formats supported by the Parquet writer
+ * @see {@link ParquetWriterConfiguration} for configuration keys to set them
*/
-@Getter
-@ToString
-@AllArgsConstructor
-public class TestRecord {
- private int partition;
- private long sequence;
- private String payload;
+public enum ParquetRecordFormat {
+ GROUP,
+ AVRO,
+ PROTOBUF;
+
}
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java
new file mode 100644
index 0000000..aeb426f
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ForkOperatorUtils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
+
+
+/**
+ * Holds configuration for the {@link ParquetHdfsDataWriter}
+ */
+@Getter @ToString
+public class ParquetWriterConfiguration {
+ public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
+ public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
+ public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
+ public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
+ public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
+ public static final String DEFAULT_PARQUET_WRITER = "v1";
+ public static final String WRITER_PARQUET_FORMAT = WRITER_PREFIX + ".parquet.format";
+ public static final String DEFAULT_PARQUET_FORMAT = "group";
+
+
+
+ public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+ public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+ public static final String DEFAULT_COMPRESSION_CODEC_NAME = "UNCOMPRESSED";
+ public static final String[] ALLOWED_COMPRESSION_CODECS = {"SNAPPY", "LZO", "UNCOMPRESSED", "GZIP"};
+ public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+ public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+ public static final String DEFAULT_WRITER_VERSION = "v1";
+ public static final String[] ALLOWED_WRITER_VERSIONS = {"v1", "v2"};
+
+
+ private final int pageSize;
+ private final int dictPageSize;
+ private final boolean dictionaryEnabled;
+ private final boolean validate;
+ private final String writerVersion;
+ private final ParquetRecordFormat recordFormat;
+
+
+ private final int numBranches;
+ private final int branchId;
+ private final String codecName;
+ private final Path absoluteStagingFile;
+ private final int blockSize;
+
+ public ParquetWriterConfiguration(State state, int numBranches, int branchId, Path stagingFile, int blockSize) {
+ this(ConfigUtils.propertiesToConfig(state.getProperties()), numBranches, branchId, stagingFile, blockSize);
+ }
+
+
+ private String getProperty(String key) {
+ return ForkOperatorUtils.getPropertyNameForBranch(key, numBranches, branchId);
+ }
+
+ public static ParquetRecordFormat getRecordFormatFromConfig(Config config) {
+ String writeSupport = ConfigUtils.getString(config, WRITER_PARQUET_FORMAT, DEFAULT_PARQUET_FORMAT);
+ ParquetRecordFormat recordFormat = ParquetRecordFormat.valueOf(writeSupport.toUpperCase());
+ return recordFormat;
+ }
+
+
+ ParquetWriterConfiguration(Config config, int numBranches, int branchId, Path stagingFile, int blockSize) {
+ this.numBranches = numBranches;
+ this.branchId = branchId;
+ this.pageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
+ this.dictPageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
+ this.dictionaryEnabled =
+ ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
+ this.validate = ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
+ String rootURI = ConfigUtils.getString(config, WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
+ this.absoluteStagingFile = new Path(rootURI, stagingFile);
+ this.codecName = ConfigUtils.getString(config,getProperty(WRITER_CODEC_TYPE), DEFAULT_COMPRESSION_CODEC_NAME);
+ this.recordFormat = getRecordFormatFromConfig(config);
+ this.writerVersion = ConfigUtils.getString(config, getProperty(WRITER_PARQUET_VERSION), DEFAULT_WRITER_VERSION);
+ this.blockSize = blockSize;
+ }
+}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
similarity index 71%
copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
index eb07977..9e64fda 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
@@ -14,21 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.test;
+package org.apache.gobblin.parquet.writer;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
+import java.io.Closeable;
+import java.io.IOException;
/**
- * A Test record
+ * An interface to shield gobblin-parquet-common integration from different parquet version specific interfaces
+ * @param <D>
*/
-@Getter
-@ToString
-@AllArgsConstructor
-public class TestRecord {
- private int partition;
- private long sequence;
- private String payload;
+public interface ParquetWriterShim<D> extends Closeable {
+ void write(D record)
+ throws IOException;
}
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java
new file mode 100644
index 0000000..10fc1a8
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.Assert;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.WriterOutputFormat;
+
+
+/**
+ * Base class for building version-specific tests for Parquet
+ */
+@Slf4j
+public abstract class ParquetHdfsDataWriterTestBase {
+
+ public ParquetHdfsDataWriterTestBase(TestConstantsBase testConstants)
+ {
+ this.testConstants = testConstants;
+ }
+
+ private final TestConstantsBase testConstants;
+ private String filePath;
+ private DataWriter writer;
+
+ protected abstract DataWriterBuilder getDataWriterBuilder();
+
+ public void setUp()
+ throws Exception {
+ // Making the staging and/or output dirs if necessary
+ File stagingDir = new File(this.testConstants.TEST_STAGING_DIR);
+ File outputDir = new File(this.testConstants.TEST_OUTPUT_DIR);
+ if (!stagingDir.exists()) {
+ boolean mkdirs = stagingDir.mkdirs();
+ assert mkdirs;
+ }
+ if (!outputDir.exists()) {
+ boolean mkdirs = outputDir.mkdirs();
+ assert mkdirs;
+ }
+ this.filePath = getFilePath();
+ }
+
+ private String getFilePath() {
+ return TestConstantsBase.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstantsBase.TEST_EXTRACT_TABLE + "/"
+ + TestConstantsBase.TEST_EXTRACT_ID + "_" + TestConstantsBase.TEST_EXTRACT_PULL_TYPE;
+ }
+
+ private State createStateWithConfig(ParquetRecordFormat format) {
+ State properties = new State();
+ properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstantsBase.TEST_FS_URI);
+ properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstantsBase.TEST_STAGING_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstantsBase.TEST_OUTPUT_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, this.testConstants.getParquetTestFilename(format.name()));
+ properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY, true);
+ properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
+ properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_PAGE_SIZE, 1024);
+ properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_VALIDATE, true);
+ properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_FORMAT, format.toString());
+ properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
+ return properties;
+ }
+
+
+ protected abstract List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format) throws IOException;
+
+
+ public void testWrite()
+ throws Exception {
+ ParquetRecordFormat[] formats = ParquetRecordFormat.values();
+ for (ParquetRecordFormat format : formats) {
+ State formatSpecificProperties = createStateWithConfig(format);
+
+ this.writer = getDataWriterBuilder()
+ .writeTo(Destination.of(Destination.DestinationType.HDFS, formatSpecificProperties))
+ .withWriterId(TestConstantsBase.TEST_WRITER_ID)
+ .writeInFormat(WriterOutputFormat.PARQUET)
+ .withSchema(getSchema(format))
+ .build();
+
+ for (int i=0; i < 2; ++i) {
+ Object record = this.testConstants.getRecord(i, format);
+ this.writer.write(record);
+ Assert.assertEquals(i+1, this.writer.recordsWritten());
+ }
+ this.writer.close();
+ this.writer.commit();
+
+ String filePath = TestConstantsBase.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
+ File outputFile = new File(filePath, this.testConstants.getParquetTestFilename(format.name()));
+
+ List<TestRecord> records = readParquetRecordsFromFile(outputFile, format);
+ for (int i = 0; i < 2; ++i) {
+ TestRecord resultRecord = records.get(i);
+ log.debug("Testing {} record {}", i, resultRecord);
+ Assert.assertEquals(TestConstantsBase.getPayloadValues()[i], resultRecord.getPayload());
+ Assert.assertEquals(TestConstantsBase.getSequenceValues()[i], resultRecord.getSequence());
+ Assert.assertEquals(TestConstantsBase.getPartitionValues()[i], resultRecord.getPartition());
+ }
+ }
+ }
+
+ protected abstract Object getSchema(ParquetRecordFormat format);
+
+ public void tearDown()
+ throws IOException {
+ // Clean up the staging and/or output directories if necessary
+ File testRootDir = new File(TestConstantsBase.TEST_ROOT_DIR);
+ if (testRootDir.exists()) {
+ FileUtil.fullyDelete(testRootDir);
+ }
+ }
+
+}
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java
new file mode 100644
index 0000000..3326022
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer.test;
+
+import java.util.Arrays;
+
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.protobuf.Message;
+
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
+
+
+/**
+ * Holder for TestConstantsBase
+ * @param <ParquetGroup> : the class that implements ParquetGroup, generic to allow package-specific overrides
+ */
+public abstract class TestConstantsBase<ParquetGroup> {
+
+ public static TestRecord[] getTestValues() {
+ return Arrays.copyOf(TEST_VALUES, TEST_VALUES.length);
+ }
+
+ public static String[] getPayloadValues() {
+ return Arrays.copyOf(PAYLOAD_VALUES, PAYLOAD_VALUES.length);
+ }
+
+ public static int[] getSequenceValues() {
+ return Arrays.copyOf(SEQUENCE_VALUES, SEQUENCE_VALUES.length);
+ }
+
+ public static int[] getPartitionValues() {
+ return Arrays.copyOf(PARTITION_VALUES, PARTITION_VALUES.length);
+ }
+
+ public final String getParquetTestFilename(String format) { return "test-"+format+".parquet"; };
+
+ public static final String TEST_FS_URI = "file:///";
+
+ public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
+
+ public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
+
+ public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+
+ public static final String TEST_WRITER_ID = "writer-1";
+
+ public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
+
+ public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
+
+ public static final String TEST_EXTRACT_TABLE = "TestTable";
+
+ public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
+
+
+ private static final TestRecord[] TEST_VALUES = new TestRecord[2];
+ public static final String PAYLOAD_FIELD_NAME = "payload";
+ public static final String SEQUENCE_FIELD_NAME = "sequence";
+ public static final String PARTITION_FIELD_NAME = "partition";
+ private static final String[] PAYLOAD_VALUES = {"value1", "value2"};
+ private static final int[] SEQUENCE_VALUES = {1, 2};
+ private static final int[] PARTITION_VALUES = {0, 1};
+ static {
+ for (int i=0; i < 2; ++i) {
+ TestRecord record = new TestRecord(getPartitionValues()[i],
+ getSequenceValues()[i],
+ getPayloadValues()[i]);
+ TEST_VALUES[i] = record;
+ }
+ }
+
+
+ public Object getRecord(int index, ParquetRecordFormat format) {
+ switch (format) {
+ case GROUP: {
+ return convertToParquetGroup(getTestValues()[index]);
+ }
+ case PROTOBUF: {
+ return getProtobufMessage(getTestValues()[index]);
+ }
+ case AVRO: {
+ return getAvroMessage(getTestValues()[index]);
+ }
+ default: {
+ throw new RuntimeException("Not understanding format " + format);
+ }
+ }
+ }
+
+
+ public abstract ParquetGroup convertToParquetGroup(TestRecord record);
+
+
+ private Message getProtobufMessage(TestRecord testValue) {
+ return TestRecordProtos.TestRecord.newBuilder()
+ .setPayload(testValue.getPayload())
+ .setPartition(testValue.getPartition())
+ .setSequence(testValue.getSequence())
+ .build();
+ }
+
+ private GenericRecord getAvroMessage(TestRecord record) {
+ org.apache.gobblin.test.avro.TestRecord testRecord = new org.apache.gobblin.test.avro.TestRecord();
+ testRecord.setPayload(record.getPayload());
+ testRecord.setPartition(record.getPartition());
+ testRecord.setSequence(record.getSequence());
+ return testRecord;
+ }
+
+
+
+}
diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle
index fb56bb1..cefd633 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -23,6 +23,8 @@ dependencies {
compile externalDependency.gson
compile externalDependency.twitterParquet
+ compile externalDependency.twitterParquetAvro
+ compile externalDependency.twitterParquetProto
testCompile externalDependency.testng
testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 4a47792..a96e079 100644
--- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -17,97 +17,101 @@
package org.apache.gobblin.writer;
import java.io.IOException;
-import java.util.Optional;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.protobuf.Message;
+import lombok.extern.slf4j.Slf4j;
+import parquet.avro.AvroParquetWriter;
import parquet.column.ParquetProperties;
import parquet.example.data.Group;
import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.example.GroupWriteSupport;
import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.proto.ProtoParquetWriter;
import parquet.schema.MessageType;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.parquet.writer.ParquetWriterShim;
-import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-
-
-public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
- public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
- public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
- public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
- public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
- public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
- public static final String DEFAULT_PARQUET_WRITER = "v1";
-
- @Override
- public DataWriter<Group> build()
- throws IOException {
- Preconditions.checkNotNull(this.destination);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
- Preconditions.checkNotNull(this.schema);
- Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
-
- switch (this.destination.getType()) {
- case HDFS:
- return new ParquetHdfsDataWriter(this, this.destination.getProperties());
- default:
- throw new RuntimeException("Unknown destination type: " + this.destination.getType());
- }
- }
+@Slf4j
+public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {
/**
- * Build a {@link ParquetWriter<Group>} for given file path with a block size.
- * @param blockSize
- * @param stagingFile
+ * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
+ * @param writerConfiguration
* @return
* @throws IOException
*/
- public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+ @Override
+ public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
throws IOException {
- State state = this.destination.getProperties();
- int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
- int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
- boolean enableDictionary =
- state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
- boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
- String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
- Path absoluteStagingFile = new Path(rootURI, stagingFile);
- CompressionCodecName codec = getCodecFromConfig();
- GroupWriteSupport support = new GroupWriteSupport();
- Configuration conf = new Configuration();
- GroupWriteSupport.setSchema(this.schema, conf);
- ParquetProperties.WriterVersion writerVersion = getWriterVersion();
- return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
- validate, writerVersion, conf);
- }
- private ParquetProperties.WriterVersion getWriterVersion() {
- return ParquetProperties.WriterVersion.fromString(
- this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
- }
+ CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
+ ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
+ .fromString(writerConfiguration.getWriterVersion());
- private CompressionCodecName getCodecFromConfig() {
- State state = this.destination.getProperties();
- String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
- .orElse(CompressionCodecName.SNAPPY.toString());
- return CompressionCodecName.valueOf(codecValue.toUpperCase());
- }
+ Configuration conf = new Configuration();
+ ParquetWriter versionSpecificWriter = null;
+ switch (writerConfiguration.getRecordFormat()) {
+ case GROUP: {
+ GroupWriteSupport.setSchema((MessageType) this.schema, conf);
+ WriteSupport support = new GroupWriteSupport();
+ versionSpecificWriter = new ParquetWriter<Group>(
+ writerConfiguration.getAbsoluteStagingFile(),
+ support,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.getDictPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ writerConfiguration.isValidate(),
+ writerVersion,
+ conf);
+ break;
+ }
+ case AVRO: {
+ versionSpecificWriter = new AvroParquetWriter(
+ writerConfiguration.getAbsoluteStagingFile(),
+ (Schema) this.schema,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ conf);
+ break;
+ }
+ case PROTOBUF: {
+ versionSpecificWriter = new ProtoParquetWriter(
+ writerConfiguration.getAbsoluteStagingFile(),
+ (Class<? extends Message>) this.schema,
+ codecName,
+ writerConfiguration.getBlockSize(),
+ writerConfiguration.getPageSize(),
+ writerConfiguration.isDictionaryEnabled(),
+ writerConfiguration.isValidate());
+ break;
+ }
+ default: throw new RuntimeException("Record format not supported");
+ }
+ ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;
+
+ return new ParquetWriterShim() {
+ @Override
+ public void write(Object record)
+ throws IOException {
+ finalVersionSpecificWriter.write(record);
+ }
- private String getProperty(String key) {
- return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+ @Override
+ public void close()
+ throws IOException {
+ finalVersionSpecificWriter.close();
+ }
+ };
}
-}
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 086b084..0f0aadb 100644
--- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -21,91 +21,115 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import parquet.avro.AvroParquetReader;
import parquet.example.data.Group;
import parquet.example.data.simple.convert.GroupRecordConverter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.api.InitContext;
import parquet.hadoop.api.ReadSupport;
import parquet.io.api.RecordMaterializer;
+import parquet.proto.ProtoParquetReader;
import parquet.schema.MessageType;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
@Test(groups = {"gobblin.writer"})
-public class ParquetHdfsDataWriterTest {
+public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase {
- private MessageType schema;
- private String filePath;
- private ParquetHdfsDataWriter writer;
- private State properties;
+ public ParquetHdfsDataWriterTest() {
+ super(new TestConstants());
+ }
@BeforeMethod
public void setUp()
throws Exception {
- // Making the staging and/or output dirs if necessary
- File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
- File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
- if (!stagingDir.exists()) {
- boolean mkdirs = stagingDir.mkdirs();
- assert mkdirs;
- }
- if (!outputDir.exists()) {
- boolean mkdirs = outputDir.mkdirs();
- assert mkdirs;
- }
- this.schema = TestConstants.PARQUET_SCHEMA;
- this.filePath = getFilePath();
- this.properties = createStateWithConfig();
- this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+ super.setUp();
}
- private String getFilePath() {
- return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
- + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+ protected DataWriterBuilder getDataWriterBuilder() {
+ return new ParquetDataWriterBuilder();
}
- private State createStateWithConfig() {
- State properties = new State();
- properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
- properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
- properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
- properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
- properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
- properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
- properties.setProp(WRITER_PARQUET_DICTIONARY, true);
- properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
- properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
- properties.setProp(WRITER_PARQUET_VALIDATE, true);
- properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
- return properties;
+ @Override
+ protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format)
+ throws IOException {
+ switch (format) {
+ case GROUP: {
+ return readParquetFilesGroup(outputFile);
+ }
+ case PROTOBUF: {
+ return readParquetFilesProto(outputFile);
+ }
+ case AVRO: {
+ return readParquetFilesAvro(outputFile);
+ }
+ default: throw new RuntimeException(format + " is not supported");
+ }
}
- private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
- ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
- writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
- writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
- writerBuilder.schema = this.schema;
- writerBuilder.format = WriterOutputFormat.PARQUET;
- return writerBuilder;
+ private List<TestRecord> readParquetFilesAvro(File outputFile)
+ throws IOException {
+ ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null;
+ List<TestRecord> records = new ArrayList<>();
+ try {
+ reader = new AvroParquetReader<>(new Path(outputFile.toString()));
+ for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) {
+ records.add(new TestRecord(value.getPartition(),
+ value.getSequence(),
+ value.getPayload()));
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+ }
+ return records;
+
}
- private List<Group> readParquetFiles(File outputFile)
+
+ protected List<TestRecord> readParquetFilesProto(File outputFile)
+ throws IOException {
+ ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null;
+ List<TestRecord> records = new ArrayList<>();
+ try {
+ reader = new ProtoParquetReader<>(new Path(outputFile.toString()));
+ TestRecordProtos.TestRecordOrBuilder value = reader.read();
+ while (value!= null) {
+ records.add(new TestRecord(value.getPartition(),
+ value.getSequence(),
+ value.getPayload()));
+ value = reader.read();
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+ }
+ return records;
+ }
+
+ protected List<TestRecord> readParquetFilesGroup(File outputFile)
throws IOException {
ParquetReader<Group> reader = null;
List<Group> records = new ArrayList<>();
@@ -123,46 +147,42 @@ public class ParquetHdfsDataWriterTest {
}
}
}
- return records;
+ return records.stream().map(value -> new TestRecord(
+ value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0),
+ value.getInteger(TestConstants.SEQUENCE_FIELD_NAME, 0),
+ value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0)
+ )).collect(Collectors.toList());
}
+
@Test
public void testWrite()
throws Exception {
- long firstWrite;
- long secondWrite;
- List<Group> records;
- Group record1 = TestConstants.PARQUET_RECORD_1;
- Group record2 = TestConstants.PARQUET_RECORD_2;
- String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
- File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
-
- this.writer.write(record1);
- firstWrite = this.writer.recordsWritten();
- this.writer.write(record2);
- secondWrite = this.writer.recordsWritten();
- this.writer.close();
- this.writer.commit();
- records = readParquetFiles(outputFile);
- Group resultRecord1 = records.get(0);
- Group resultRecord2 = records.get(1);
-
- Assert.assertEquals(firstWrite, 1);
- Assert.assertEquals(secondWrite, 2);
- Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
- Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
- Assert.assertEquals(resultRecord2.getString("name", 0), "other");
- Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+ super.testWrite();
+ }
+
+ @Override
+ protected Object getSchema(ParquetRecordFormat format) {
+ switch (format) {
+ case GROUP: {
+ return TestConstants.PARQUET_SCHEMA;
+ }
+ case PROTOBUF: {
+ return TestRecordProtos.TestRecord.class;
+ }
+ case AVRO: {
+ return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+ }
+ default:
+ throw new RuntimeException(format.name() + " is not implemented");
+ }
}
+
@AfterClass
public void tearDown()
throws IOException {
- // Clean up the staging and/or output directories if necessary
- File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
- if (testRootDir.exists()) {
- FileUtil.fullyDelete(testRootDir);
- }
+ super.tearDown();
}
class SimpleReadSupport extends ReadSupport<Group> {
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
index e5bf215..6eb58dc 100644
--- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -23,40 +23,28 @@ import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
import parquet.schema.Types;
+import org.apache.gobblin.parquet.writer.test.TestConstantsBase;
+import org.apache.gobblin.test.TestRecord;
-public class TestConstants {
- public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
- .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
- Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
-
- public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
-
- public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
-
- public static final String PARQUET_TEST_FILENAME = "test.parquet";
-
- public static final String TEST_FS_URI = "file:///";
-
- public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
-
- public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
- public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+public class TestConstants extends TestConstantsBase<Group> {
- public static final String TEST_WRITER_ID = "writer-1";
-
- public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
-
- public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
-
- public static final String TEST_EXTRACT_TABLE = "TestTable";
-
- public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
-
- static {
- PARQUET_RECORD_1.add("name", "tilak");
- PARQUET_RECORD_1.add("age", 22);
- PARQUET_RECORD_2.add("name", "other");
- PARQUET_RECORD_2.add("age", 22);
+ public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+ .addFields(
+ Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+ .named(TestConstants.PAYLOAD_FIELD_NAME),
+ Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME),
+ // Sequence field is INT32 instead of INT64, because this version of parquet only supports INT32
+ Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.SEQUENCE_FIELD_NAME))
+ .named("Data");
+
+ @Override
+ public Group convertToParquetGroup(TestRecord record) {
+ Group group = new SimpleGroup(PARQUET_SCHEMA);
+ group.add(PAYLOAD_FIELD_NAME, record.getPayload());
+ group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence()).intValue());
+ group.add(PARTITION_FIELD_NAME, record.getPartition());
+ return group;
}
-}
+
+}
\ No newline at end of file
diff --git a/gobblin-test-utils/build.gradle b/gobblin-test-utils/build.gradle
index 3c42427..2402ac7 100644
--- a/gobblin-test-utils/build.gradle
+++ b/gobblin-test-utils/build.gradle
@@ -14,6 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+plugins {
+ id "com.google.protobuf" version "0.8.8"
+ id "com.commercehub.gradle.plugin.avro-base" version "0.9.0"
+}
apply plugin: 'java'
@@ -28,13 +32,72 @@ dependencies {
compile externalDependency.lombok
compile externalDependency.typesafeConfig
compile externalDependency.findBugsAnnotations
+ compile externalDependency.protobuf
+
testCompile externalDependency.testng
testCompile externalDependency.mockito
}
+sourceSets {
+ main {
+ java {
+ srcDir 'src/main/gen-proto'
+ srcDir 'src/main/gen-avro'
+ }
+ resources {
+ srcDir 'src/main/proto'
+ srcDir 'src/main/avro'
+ }
+
+ }
+}
+
+/**
+clean.doFirst {
+ delete file("${projectDir}/src/main/gen-proto/").listFiles()
+ delete file("${projectDir}/src/main/gen-avro/").listFiles()
+}
+**/
+
test {
workingDir rootProject.rootDir
}
+protobuf {
+ generatedFilesBaseDir = "$projectDir/src/main/gen-proto"
+ protoc {
+ artifact = "com.google.protobuf:protoc:3.6.1"
+ }
+ plugins {
+ grpc {
+ artifact = "io.grpc:protoc-gen-grpc-java:1.19.0"
+ }
+ }
+ generateProtoTasks {
+ all()*.plugins {
+ grpc {}
+ }
+ }
+}
+
+avro {
+ stringType = "string"
+}
+
+task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
+ source("src/main/avro")
+ outputDir = file("src/main/gen-avro")
+}
+
+compileJava {
+ dependsOn tasks.generateAvro
+}
+
+checkstyleMain.source = ['src/main/java','src/test/java']
+
+findbugs {
+ excludeFilter = file("buildConfig/findbugs-exclude-filter.xml")
+}
+
ext.classification="library"
diff --git a/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml
new file mode 100644
index 0000000..61b34c0
--- /dev/null
+++ b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<FindBugsFilter>
+ <Match>
+ <Package name="~org\.apache\.gobblin\.test\.proto.*"/>
+ </Match>
+</FindBugsFilter>
diff --git a/gobblin-test-utils/src/main/avro/TestRecord.avsc b/gobblin-test-utils/src/main/avro/TestRecord.avsc
new file mode 100644
index 0000000..0a5d520
--- /dev/null
+++ b/gobblin-test-utils/src/main/avro/TestRecord.avsc
@@ -0,0 +1,23 @@
+{
+ "type": "record",
+ "name": "TestRecord",
+ "namespace" : "org.apache.gobblin.test.avro",
+ "fields": [
+ {
+ "name": "partition",
+ "type": "int",
+ "doc": "Partition id for the record"
+ },
+ {
+ "name": "sequence",
+ "type": "long",
+ "doc": "The sequence number"
+ },
+ {
+ "name": "payload",
+ "type": "string",
+ "default" : "",
+ "doc": "Human-readable payload"
+ }
+ ]
+}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java
similarity index 100%
rename from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
rename to gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java
diff --git a/gobblin-test-utils/src/main/proto/TestRecord.proto b/gobblin-test-utils/src/main/proto/TestRecord.proto
new file mode 100644
index 0000000..53bafbd
--- /dev/null
+++ b/gobblin-test-utils/src/main/proto/TestRecord.proto
@@ -0,0 +1,11 @@
+syntax = "proto2";
+
+option java_package = "org.apache.gobblin.test.proto";
+option java_outer_classname = "TestRecordProtos";
+
+
+message TestRecord {
+ required int32 partition = 1;
+ required int64 sequence = 2;
+ required string payload = 3;
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index f2836f7..7c83a79 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -130,7 +130,7 @@ ext.externalDependency = [
"curatorTest": "org.apache.curator:curator-test:2.10.0",
"hamcrest": "org.hamcrest:hamcrest-all:1.3",
"joptSimple": "net.sf.jopt-simple:jopt-simple:4.9",
- "protobuf": "com.google.protobuf:protobuf-java:2.5.0",
+ "protobuf": "com.google.protobuf:protobuf-java:3.6.1",
"pegasus" : [
"data" : "com.linkedin.pegasus:data:" + pegasusVersion,
"generator" : "com.linkedin.pegasus:generator:" + pegasusVersion,
@@ -169,7 +169,11 @@ ext.externalDependency = [
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
"orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.1",
'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
+ 'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1',
+ 'parquetProto': 'org.apache.parquet:parquet-protobuf:1.10.1',
'twitterParquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
+ 'twitterParquetAvro': 'com.twitter:parquet-avro:1.5.0',
+ 'twitterParquetProto': 'com.twitter:parquet-protobuf:1.5.0',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [
"org.slf4j:slf4j-api:" + slf4jVersion,
@@ -182,6 +186,7 @@ ext.externalDependency = [
],
"postgresConnector": "org.postgresql:postgresql:42.1.4",
"assertj": 'org.assertj:assertj-core:3.8.0',
+ "protobuf": 'com.google.protobuf:protobuf-java:3.6.1',
]
if (!isDefaultEnvironment)