You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by sh...@apache.org on 2020/01/15 22:25:31 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format

This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f44ad4f  [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format
f44ad4f is described below

commit f44ad4f2b19f9fcd00518a748748bd5b72c34df3
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Wed Jan 15 14:25:12 2020 -0800

    [GOBBLIN-1015] Adding support for direct Avro and Protobuf writes in Parquet format
    
    Closes #2860 from shirshanka/protoavroparquet
---
 defaultEnvironment.gradle                          |   3 +
 gobblin-core-base/build.gradle                     |   4 +-
 .../apache/gobblin/test/AnyToJsonConverter.java    |   4 +-
 .../apache/gobblin/test/AnyToStringConverter.java  |   4 +-
 .../apache/gobblin/test/SequentialTestSource.java  |  94 ++++++++--
 gobblin-docs/sinks/ParquetHdfsDataWriter.md        |   6 +-
 .../src/main/resources/example-parquet.pull        |  34 ++++
 .../gobblin-parquet-apache/build.gradle            |   2 +
 .../gobblin/writer/ParquetDataWriterBuilder.java   | 144 +++++++--------
 .../gobblin/writer/ParquetHdfsDataWriter.java      |  70 --------
 .../gobblin/writer/ParquetHdfsDataWriterTest.java  | 194 ++++++++++++---------
 .../org/apache/gobblin/writer/TestConstants.java   |  51 ++----
 .../writer/AbstractParquetDataWriterBuilder.java   |  73 ++++++++
 .../parquet}/writer/ParquetHdfsDataWriter.java     |  21 +--
 .../parquet/writer/ParquetRecordFormat.java        |  22 +--
 .../parquet/writer/ParquetWriterConfiguration.java | 107 ++++++++++++
 .../gobblin/parquet/writer/ParquetWriterShim.java  |  20 +--
 .../writer/test/ParquetHdfsDataWriterTestBase.java | 145 +++++++++++++++
 .../parquet/writer/test/TestConstantsBase.java     | 129 ++++++++++++++
 gobblin-modules/gobblin-parquet/build.gradle       |   2 +
 .../gobblin/writer/ParquetDataWriterBuilder.java   | 148 ++++++++--------
 .../gobblin/writer/ParquetHdfsDataWriterTest.java  | 188 +++++++++++---------
 .../org/apache/gobblin/writer/TestConstants.java   |  54 +++---
 gobblin-test-utils/build.gradle                    |  63 +++++++
 .../buildConfig/findbugs-exclude-filter.xml        |   6 +
 gobblin-test-utils/src/main/avro/TestRecord.avsc   |  23 +++
 .../java/org/apache/gobblin/test/TestRecord.java   |   0
 gobblin-test-utils/src/main/proto/TestRecord.proto |  11 ++
 gradle/scripts/dependencyDefinitions.gradle        |   7 +-
 29 files changed, 1121 insertions(+), 508 deletions(-)

diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle
index af64d4e..b5f10a7 100644
--- a/defaultEnvironment.gradle
+++ b/defaultEnvironment.gradle
@@ -28,6 +28,9 @@ subprojects {
     maven {
       url "http://conjars.org/repo"
     }
+    maven {
+      url "https://maven.twttr.com/"
+    }
   }
 
   project.buildDir = new File(project.rootProject.buildDir, project.name)
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index 934a22e..80b0f67 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -18,11 +18,13 @@
 apply plugin: 'java'
 apply plugin: 'me.champeau.gradle.jmh'
 
+
 dependencies {
   compile project(":gobblin-api")
   compile project(":gobblin-utility")
   compile project(":gobblin-metrics-libs:gobblin-metrics")
   compile project(":gobblin-modules:gobblin-codecs")
+  compile project(":gobblin-test-utils")
 
   compile externalDependency.reactivex
   compile externalDependency.avroMapredH2
@@ -34,7 +36,6 @@ dependencies {
   compile externalDependency.typesafeConfig
   compile externalDependency.findBugsAnnotations
 
-  testCompile project(":gobblin-test-utils")
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
 
@@ -51,5 +52,4 @@ jmh {
     duplicateClassesStrategy = "EXCLUDE"
 }
 
-
 ext.classification="library"
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
index c6acc06..634098b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToJsonConverter.java
@@ -31,12 +31,12 @@ import org.apache.gobblin.util.io.GsonInterfaceAdapter;
 /**
  * Converts any Object into a Json object
  */
-public class AnyToJsonConverter extends Converter<String, String, Object, JsonElement> {
+public class AnyToJsonConverter extends Converter<Object, String, Object, JsonElement> {
   private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class);
   private boolean stripTopLevelType = true; // TODO: Configure
 
   @Override
-  public String convertSchema(String inputSchema, WorkUnitState workUnit)
+  public String convertSchema(Object inputSchema, WorkUnitState workUnit)
       throws SchemaConversionException {
     return "";
   }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
index adc801e..3684f2d 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/AnyToStringConverter.java
@@ -26,10 +26,10 @@ import org.apache.gobblin.converter.SingleRecordIterable;
 /**
  * Converts any Object into a String
  */
-public class AnyToStringConverter extends Converter<String, String, Object, String> {
+public class AnyToStringConverter extends Converter<Object, String, Object, String> {
 
   @Override
-  public String convertSchema(String inputSchema, WorkUnitState workUnit)
+  public String convertSchema(Object inputSchema, WorkUnitState workUnit)
       throws SchemaConversionException {
     return "";
   }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
index 12ba1f2..5872808 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
@@ -39,28 +39,38 @@ import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark;
 import org.apache.gobblin.source.extractor.Extractor;
-import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.source.extractor.WatermarkInterval;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.ExtractFactory;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.test.proto.TestRecordProtos;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.WatermarkStorage;
 
-
 /**
  * A Test source that generates a sequence of records, works in batch and streaming mode.
  */
 @Slf4j
-public class SequentialTestSource implements Source<String, Object> {
+public class SequentialTestSource implements Source<Object, Object> {
+
+  private enum InMemoryFormat {
+    POJO,
+    AVRO,
+    PROTOBUF
+  }
+
+
   private static final int DEFAULT_NUM_PARALLELISM = 1;
   private static final String DEFAULT_NAMESPACE = "TestDB";
   private static final String DEFAULT_TABLE = "TestTable";
   private static final Integer DEFAULT_NUM_RECORDS_PER_EXTRACT = 100;
   public static final String WORK_UNIT_INDEX = "workUnitIndex";
   private static final Long DEFAULT_SLEEP_TIME_PER_RECORD_MILLIS = 10L;
+  public static final String MEMORY_FORMAT_KEY = "inMemoryFormat";
+  public static final String DEFAULT_IN_MEMORY_FORMAT = InMemoryFormat.POJO.toString();
 
 
   private final AtomicBoolean configured = new AtomicBoolean(false);
@@ -69,6 +79,7 @@ public class SequentialTestSource implements Source<String, Object> {
   private String table;
   private int numRecordsPerExtract;
   private long sleepTimePerRecord;
+  private InMemoryFormat inMemFormat;
   private final Extract.TableType tableType = Extract.TableType.APPEND_ONLY;
   private final ExtractFactory _extractFactory = new ExtractFactory("yyyyMMddHHmmss");
   private boolean streaming = false;
@@ -86,6 +97,12 @@ public class SequentialTestSource implements Source<String, Object> {
       if (streaming) {
         numRecordsPerExtract = Integer.MAX_VALUE;
       }
+      inMemFormat = InMemoryFormat.valueOf(ConfigUtils.getString(config, "source." + MEMORY_FORMAT_KEY,
+          DEFAULT_IN_MEMORY_FORMAT));
+      log.info("Source configured with: num_parallelism: {}, namespace: {}, "
+          + "table: {}, numRecordsPerExtract: {}, sleepTimePerRecord: {}, streaming: {}, inMemFormat: {}",
+          this.num_parallelism, this.namespace,
+          this.table, this.numRecordsPerExtract, this.sleepTimePerRecord, this.streaming, this.inMemFormat);
       configured.set(true);
     }
   }
@@ -110,6 +127,7 @@ public class SequentialTestSource implements Source<String, Object> {
           workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval);
           log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
           workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
+          workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
         }
         else
         {
@@ -120,6 +138,7 @@ public class SequentialTestSource implements Source<String, Object> {
           workUnit = WorkUnit.create(newExtract(tableType, namespace, table), watermarkInterval);
           log.debug("Will be setting watermark interval to " + watermarkInterval.toJson());
           workUnit.setProp(WORK_UNIT_INDEX, workUnitState.getWorkunit().getProp(WORK_UNIT_INDEX));
+          workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
         }
         newWorkUnits.add(workUnit);
       }
@@ -140,6 +159,7 @@ public class SequentialTestSource implements Source<String, Object> {
       LongWatermark expectedHighWatermark = new LongWatermark((i + 1) * numRecordsPerExtract);
       workUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedHighWatermark));
       workUnit.setProp(WORK_UNIT_INDEX, i);
+      workUnit.setProp(MEMORY_FORMAT_KEY, this.inMemFormat.toString());
       workUnits.add(workUnit);
     }
     return workUnits;
@@ -150,12 +170,14 @@ public class SequentialTestSource implements Source<String, Object> {
   }
 
 
-  static class TestBatchExtractor implements Extractor<String, Object> {
+  static class TestBatchExtractor implements Extractor<Object, Object> {
     private long recordsExtracted = 0;
     private final long numRecordsPerExtract;
     private LongWatermark currentWatermark;
     private long sleepTimePerRecord;
     private int partition;
+    private final InMemoryFormat inMemoryFormat;
+    private final Object schema;
     WorkUnitState workUnitState;
 
 
@@ -169,16 +191,34 @@ public class SequentialTestSource implements Source<String, Object> {
       this.numRecordsPerExtract = numRecordsPerExtract;
       this.sleepTimePerRecord = sleepTimePerRecord;
       this.workUnitState = wuState;
+      this.inMemoryFormat = InMemoryFormat.valueOf(this.workUnitState.getProp(MEMORY_FORMAT_KEY));
+      this.schema = getSchema(inMemoryFormat);
     }
 
       @Override
-      public String getSchema()
+      public Object getSchema()
           throws IOException {
-        return "";
+        return this.schema;
+      }
+
+      private Object getSchema(InMemoryFormat inMemoryFormat) {
+        switch (inMemoryFormat) {
+          case POJO: {
+            return TestRecord.class;
+          }
+          case AVRO: {
+            return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+          }
+          case PROTOBUF: {
+            return TestRecordProtos.TestRecord.class;
+          }
+          default:
+            throw new RuntimeException("Not implemented " + inMemoryFormat.name());
+        }
       }
 
       @Override
-      public Object readRecord(@Deprecated Object reuse)
+      public RecordEnvelope readRecordEnvelope()
           throws DataRecordException, IOException {
         if (recordsExtracted < numRecordsPerExtract) {
           try {
@@ -186,11 +226,37 @@ public class SequentialTestSource implements Source<String, Object> {
           } catch (InterruptedException e) {
             Throwables.propagate(e);
           }
-          TestRecord record = new TestRecord(this.partition, this.currentWatermark.getValue(), null);
+          Object record;
+          switch (this.inMemoryFormat) {
+            case POJO: {
+              record = new TestRecord(this.partition, this.currentWatermark.getValue(), "I am a POJO message");
+              break;
+            }
+            case AVRO: {
+              record = org.apache.gobblin.test.avro.TestRecord.newBuilder()
+                  .setPartition(this.partition)
+                  .setSequence(this.currentWatermark.getValue())
+                  .setPayload("I am an Avro message")
+                  .build();
+              break;
+            }
+            case PROTOBUF: {
+              record = TestRecordProtos.TestRecord.newBuilder()
+                  .setPartition(this.partition)
+                  .setSequence(this.currentWatermark.getValue())
+                  .setPayload("I am a Protobuf message")
+                  .build();
+              break;
+            }
+            default: throw new RuntimeException("");
+          }
           log.debug("Extracted record -> {}", record);
+          RecordEnvelope re = new RecordEnvelope<>(record,
+              new DefaultCheckpointableWatermark(String.valueOf(this.partition),
+              new LongWatermark(this.currentWatermark.getValue())));
           currentWatermark.increment();
           recordsExtracted++;
-          return record;
+          return re;
         } else {
           return null;
         }
@@ -218,7 +284,7 @@ public class SequentialTestSource implements Source<String, Object> {
   }
 
 
-  static class TestStreamingExtractor implements StreamingExtractor<String, Object> {
+  static class TestStreamingExtractor implements StreamingExtractor<Object, Object> {
     private Optional<WatermarkStorage> watermarkStorage;
     private final TestBatchExtractor extractor;
 
@@ -233,7 +299,7 @@ public class SequentialTestSource implements Source<String, Object> {
     }
 
     @Override
-    public String getSchema()
+    public Object getSchema()
     throws IOException {
       return extractor.getSchema();
     }
@@ -241,9 +307,7 @@ public class SequentialTestSource implements Source<String, Object> {
     @Override
     public RecordEnvelope<Object> readRecordEnvelope()
     throws DataRecordException, IOException {
-      TestRecord record = (TestRecord) extractor.readRecord(null);
-      return new RecordEnvelope<>((Object) record, new DefaultCheckpointableWatermark(""+record.getPartition(),
-          new LongWatermark(record.getSequence())));
+      return extractor.readRecordEnvelope();
     }
 
     @Override
@@ -288,7 +352,7 @@ public class SequentialTestSource implements Source<String, Object> {
 
 
   @Override
-  public Extractor<String, Object> getExtractor(WorkUnitState state)
+  public Extractor<Object, Object> getExtractor(WorkUnitState state)
       throws IOException {
     Config config = ConfigFactory.parseProperties(state.getProperties());
     configureIfNeeded(config);
diff --git a/gobblin-docs/sinks/ParquetHdfsDataWriter.md b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
index a7a24ea..8485cbf 100644
--- a/gobblin-docs/sinks/ParquetHdfsDataWriter.md
+++ b/gobblin-docs/sinks/ParquetHdfsDataWriter.md
@@ -1,6 +1,6 @@
 # Description
 
-An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of [`Group.java`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.type`](https://gobblin.readthe [...]
+An extension to [`FsDataWriter`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java) that writes in Parquet format in the form of either Avro, Protobuf or [`ParquetGroup`](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/example/data/Group.java). This implementation allows users to specify the CodecFactory to use through the configuration property [`writer.codec.typ [...]
 
 # Usage
 ```
@@ -9,6 +9,9 @@ writer.destination.type=HDFS
 writer.output.format=PARQUET
 ```
 
+# Example Pipeline Configuration
+* [`example-parquet.pull`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-example/src/main/resources/example-parquet.pull) contains an example of generating test data and writing to Parquet files.
+
 
 # Configuration
 
@@ -19,6 +22,7 @@ writer.output.format=PARQUET
 | writer.parquet.dictionary | To turn dictionary encoding on. Parquet has a dictionary encoding for data with a small number of unique values ( < 10^5 ) that aids in significant compression and boosts processing speed. | true | No |
 | writer.parquet.validate | To turn on validation using the schema. This validation is done by [`ParquetWriter`](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java) not by Gobblin. | false | No |
 | writer.parquet.version | Version of parquet writer to use. Available versions are v1 and v2. | v1 | No |
+| writer.parquet.format | In-memory format of the record being written to Parquet. [`Options`](https://github.com/apache/incubator-gobblin/blob/master/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java) are AVRO, PROTOBUF and GROUP | GROUP | No |
 
 # Developer Notes
 
diff --git a/gobblin-example/src/main/resources/example-parquet.pull b/gobblin-example/src/main/resources/example-parquet.pull
new file mode 100644
index 0000000..e61caf6
--- /dev/null
+++ b/gobblin-example/src/main/resources/example-parquet.pull
@@ -0,0 +1,34 @@
+job.name=ExampleParquetWriter
+job.group=Parquet
+job.description=This is a job that generates test data and writes to Parquet
+job.lock.enabled=false
+task.execution.synchronousExecutionModel=false
+
+source.class=org.apache.gobblin.test.SequentialTestSource
+source.numParallelism=2
+source.inMemoryFormat=AVRO
+#source.inMemoryFormat = POJO
+#source.inMemoryFormat = PROTOBUF
+
+fs.uri=file:///
+#work.dir=SET_TO_WORK_DIRECTORY
+
+#converter.classes = CONVERTER_CLASSES_IF_ANY
+
+extract.table.name=TestData
+extract.namespace=org.apache.gobblin.example
+extract.table.type=APPEND_ONLY
+
+state.store.enabled=true
+state.store.fs.uri=${fs.uri}
+#state.store.dir=${work.dir}/store
+
+
+writer.destination.type=HDFS
+writer.output.format=PARQUET
+writer.parquet.format=AVRO
+#writer.parquet.format=PROTOBUF // use this for Protobuf data
+writer.fs.uri=${fs.uri}
+writer.builder.class=org.apache.gobblin.writer.ParquetDataWriterBuilder
+data.publisher.fs.uri=${fs.uri}
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
diff --git a/gobblin-modules/gobblin-parquet-apache/build.gradle b/gobblin-modules/gobblin-parquet-apache/build.gradle
index 560638d..5cf4618 100644
--- a/gobblin-modules/gobblin-parquet-apache/build.gradle
+++ b/gobblin-modules/gobblin-parquet-apache/build.gradle
@@ -23,6 +23,8 @@ dependencies {
 
   compile externalDependency.gson
   compile externalDependency.parquet
+  compile externalDependency.parquetAvro
+  compile externalDependency.parquetProto
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index ac4e2d4..2060037 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -17,96 +17,102 @@
 package org.apache.gobblin.writer;
 
 import java.io.IOException;
-import java.util.Optional;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.proto.ProtoParquetWriter;
 import org.apache.parquet.schema.MessageType;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.protobuf.Message;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
+import lombok.extern.slf4j.Slf4j;
 
-import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.parquet.writer.ParquetWriterShim;
 
 
-public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
-  public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
-  public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
-  public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
-  public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
-  public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
-  public static final String DEFAULT_PARQUET_WRITER = "v1";
-
-  @Override
-  public DataWriter<Group> build()
-      throws IOException {
-    Preconditions.checkNotNull(this.destination);
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
-    Preconditions.checkNotNull(this.schema);
-    Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
-
-    switch (this.destination.getType()) {
-      case HDFS:
-        return new ParquetHdfsDataWriter(this, this.destination.getProperties());
-      default:
-        throw new RuntimeException("Unknown destination type: " + this.destination.getType());
-    }
-  }
+@Slf4j
+public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {
 
   /**
-   * Build a {@link ParquetWriter<Group>} for given file path with a block size.
-   * @param blockSize
-   * @param stagingFile
+   * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
+   * @param writerConfiguration
    * @return
    * @throws IOException
    */
-  public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+  @Override
+  public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
       throws IOException {
-    State state = this.destination.getProperties();
-    int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
-    int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
-    boolean enableDictionary =
-        state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
-    boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
-    String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
-    Path absoluteStagingFile = new Path(rootURI, stagingFile);
-    CompressionCodecName codec = getCodecFromConfig();
-    GroupWriteSupport support = new GroupWriteSupport();
-    Configuration conf = new Configuration();
-    GroupWriteSupport.setSchema(this.schema, conf);
-    ParquetProperties.WriterVersion writerVersion = getWriterVersion();
-    return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
-        validate, writerVersion, conf);
-  }
 
-  private ParquetProperties.WriterVersion getWriterVersion() {
-    return ParquetProperties.WriterVersion.fromString(
-        this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
-  }
+    CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
+    ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
+        .fromString(writerConfiguration.getWriterVersion());
 
-  private CompressionCodecName getCodecFromConfig() {
-    State state = this.destination.getProperties();
-    String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
-        .orElse(CompressionCodecName.SNAPPY.toString());
-    return CompressionCodecName.valueOf(codecValue.toUpperCase());
-  }
+    Configuration conf = new Configuration();
+    ParquetWriter versionSpecificWriter = null;
+    switch (writerConfiguration.getRecordFormat()) {
+      case GROUP: {
+        GroupWriteSupport.setSchema((MessageType) this.schema, conf);
+        WriteSupport support = new GroupWriteSupport();
+        versionSpecificWriter = new ParquetWriter<Group>(
+            writerConfiguration.getAbsoluteStagingFile(),
+            support,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.getDictPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            writerConfiguration.isValidate(),
+            writerVersion,
+            conf);
+        break;
+      }
+      case AVRO:  {
+        versionSpecificWriter = new AvroParquetWriter(
+            writerConfiguration.getAbsoluteStagingFile(),
+            (Schema) this.schema,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            conf);
+        break;
+      }
+      case PROTOBUF: {
+        versionSpecificWriter = new ProtoParquetWriter(
+            writerConfiguration.getAbsoluteStagingFile(),
+            (Class<? extends Message>) this.schema,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            writerConfiguration.isValidate());
+        break;
+      }
+      default: throw new RuntimeException("Record format not supported");
+    }
+    ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;
+
+    return new ParquetWriterShim() {
+      @Override
+      public void write(Object record)
+          throws IOException {
+        finalVersionSpecificWriter.write(record);
+      }
 
-  private String getProperty(String key) {
-    return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+      @Override
+      public void close()
+          throws IOException {
+        finalVersionSpecificWriter.close();
+      }
+    };
   }
 }
diff --git a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
deleted file mode 100644
index 8a2fc9e..0000000
--- a/gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.hadoop.ParquetWriter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-
-/**
- * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
- *
- * <p>
- *   This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
- *   property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
- * </p>
- *
- * @author tilakpatidar
- */
-public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
-  private final ParquetWriter<Group> writer;
-  protected final AtomicLong count = new AtomicLong(0);
-
-  public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
-      throws IOException {
-    super(builder, state);
-    this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
-  }
-
-  @Override
-  public void write(Group record)
-      throws IOException {
-    this.writer.write(record);
-    this.count.incrementAndGet();
-  }
-
-  @Override
-  public long recordsWritten() {
-    return this.count.get();
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    try {
-      this.writer.close();
-    } finally {
-      super.close();
-    }
-  }
-}
diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 46b41d7..fe3a51a 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -21,91 +21,117 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
+import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.proto.ProtoParquetReader;
 import org.apache.parquet.schema.MessageType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
+import lombok.extern.slf4j.Slf4j;
 
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
 
 
 @Test(groups = {"gobblin.writer"})
-public class ParquetHdfsDataWriterTest {
+@Slf4j
+public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase {
 
-  private MessageType schema;
-  private String filePath;
-  private ParquetHdfsDataWriter writer;
-  private State properties;
+
+  public ParquetHdfsDataWriterTest() {
+    super(new TestConstants());
+  }
+
+  @Override
+  protected DataWriterBuilder getDataWriterBuilder() {
+    return new ParquetDataWriterBuilder();
+  }
 
   @BeforeMethod
   public void setUp()
       throws Exception {
-    // Making the staging and/or output dirs if necessary
-    File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
-    File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
-    if (!stagingDir.exists()) {
-      boolean mkdirs = stagingDir.mkdirs();
-      assert mkdirs;
-    }
-    if (!outputDir.exists()) {
-      boolean mkdirs = outputDir.mkdirs();
-      assert mkdirs;
-    }
-    this.schema = TestConstants.PARQUET_SCHEMA;
-    this.filePath = getFilePath();
-    this.properties = createStateWithConfig();
-    this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+    super.setUp();
   }
 
-  private String getFilePath() {
-    return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
-        + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+  @Override
+  protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format)
+      throws IOException {
+    switch (format) {
+      case GROUP: {
+        return readParquetFilesGroup(outputFile);
+      }
+      case PROTOBUF: {
+        return readParquetFilesProto(outputFile);
+      }
+      case AVRO: {
+        return readParquetFilesAvro(outputFile);
+      }
+      default: throw new RuntimeException(format + " is not supported");
+    }
   }
 
-  private State createStateWithConfig() {
-    State properties = new State();
-    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
-    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
-    properties.setProp(WRITER_PARQUET_DICTIONARY, true);
-    properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
-    properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
-    properties.setProp(WRITER_PARQUET_VALIDATE, true);
-    properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
-    return properties;
+  private List<TestRecord> readParquetFilesAvro(File outputFile)
+      throws IOException {
+    ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null;
+    List<TestRecord> records = new ArrayList<>();
+    try {
+      reader = new AvroParquetReader<>(new Path(outputFile.toString()));
+      for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) {
+        records.add(new TestRecord(value.getPartition(),
+            value.getSequence(),
+            value.getPayload()));
+      }
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (Exception ex) {
+          System.out.println(ex.getMessage());
+        }
+      }
+    }
+    return records;
+
   }
 
-  private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
-    ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
-    writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
-    writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
-    writerBuilder.schema = this.schema;
-    writerBuilder.format = WriterOutputFormat.PARQUET;
-    return writerBuilder;
+  protected List<TestRecord> readParquetFilesProto(File outputFile)
+      throws IOException {
+    ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null;
+    List<TestRecord> records = new ArrayList<>();
+    try {
+      reader = new ProtoParquetReader<>(new Path(outputFile.toString()));
+      for (TestRecordProtos.TestRecordOrBuilder value = reader.read(); value != null; value = reader.read()) {
+        records.add(new TestRecord(value.getPartition(),
+            value.getSequence(),
+            value.getPayload()));
+      }
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (Exception ex) {
+          System.out.println(ex.getMessage());
+        }
+      }
+    }
+    return records;
   }
 
-  private List<Group> readParquetFiles(File outputFile)
+
+  protected List<TestRecord> readParquetFilesGroup(File outputFile)
       throws IOException {
     ParquetReader<Group> reader = null;
     List<Group> records = new ArrayList<>();
@@ -123,46 +149,40 @@ public class ParquetHdfsDataWriterTest {
         }
       }
     }
-    return records;
+    return records.stream().map(value -> new TestRecord(
+        value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0),
+        value.getLong(TestConstants.SEQUENCE_FIELD_NAME, 0),
+        value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0)
+    )).collect(Collectors.toList());
   }
 
   @Test
   public void testWrite()
       throws Exception {
-    long firstWrite;
-    long secondWrite;
-    List<Group> records;
-    Group record1 = TestConstants.PARQUET_RECORD_1;
-    Group record2 = TestConstants.PARQUET_RECORD_2;
-    String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
-    File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
-
-    this.writer.write(record1);
-    firstWrite = this.writer.recordsWritten();
-    this.writer.write(record2);
-    secondWrite = this.writer.recordsWritten();
-    this.writer.close();
-    this.writer.commit();
-    records = readParquetFiles(outputFile);
-    Group resultRecord1 = records.get(0);
-    Group resultRecord2 = records.get(1);
-
-    Assert.assertEquals(firstWrite, 1);
-    Assert.assertEquals(secondWrite, 2);
-    Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
-    Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
-    Assert.assertEquals(resultRecord2.getString("name", 0), "other");
-    Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+    super.testWrite();
+  }
+
+  @Override
+  protected Object getSchema(ParquetRecordFormat format) {
+    switch (format) {
+      case GROUP: {
+        return TestConstants.PARQUET_SCHEMA;
+      }
+      case PROTOBUF: {
+        return TestRecordProtos.TestRecord.class;
+      }
+      case AVRO: {
+        return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+      }
+      default:
+        throw new RuntimeException(format.name() + " is not implemented");
+    }
   }
 
   @AfterClass
   public void tearDown()
       throws IOException {
-    // Clean up the staging and/or output directories if necessary
-    File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
-    if (testRootDir.exists()) {
-      FileUtil.fullyDelete(testRootDir);
-    }
+    super.tearDown();
   }
 
   class SimpleReadSupport extends ReadSupport<Group> {
diff --git a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 41d7ee3..9b7f7f6 100644
--- a/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-modules/gobblin-parquet-apache/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -23,40 +23,27 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
 
+import org.apache.gobblin.parquet.writer.test.TestConstantsBase;
+import org.apache.gobblin.test.TestRecord;
 
-public class TestConstants {
-  public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
-      .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
-          Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
-
-  public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
-
-  public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
-
-  public static final String PARQUET_TEST_FILENAME = "test.parquet";
-
-  public static final String TEST_FS_URI = "file:///";
-
-  public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
-
-  public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
 
-  public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+public class TestConstants extends TestConstantsBase<Group> {
 
-  public static final String TEST_WRITER_ID = "writer-1";
-
-  public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
-
-  public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
-
-  public static final String TEST_EXTRACT_TABLE = "TestTable";
-
-  public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
-
-  static {
-    PARQUET_RECORD_1.add("name", "tilak");
-    PARQUET_RECORD_1.add("age", 22);
-    PARQUET_RECORD_2.add("name", "other");
-    PARQUET_RECORD_2.add("age", 22);
+  public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+      .addFields(
+          Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+              .named(TestConstants.PAYLOAD_FIELD_NAME),
+          Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME),
+          Types.required(PrimitiveType.PrimitiveTypeName.INT64).named(TestConstants.SEQUENCE_FIELD_NAME))
+      .named("Data");
+
+  @Override
+  public Group convertToParquetGroup(TestRecord record) {
+    Group group = new SimpleGroup(PARQUET_SCHEMA);
+    group.add(PAYLOAD_FIELD_NAME, record.getPayload());
+    group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence()));
+    group.add(PARTITION_FIELD_NAME, record.getPartition());
+    return group;
   }
+
 }
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java
new file mode 100644
index 0000000..0ed3965
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/AbstractParquetDataWriterBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.FsDataWriterBuilder;
+import org.apache.gobblin.writer.WriterOutputFormat;
+
+
+@Slf4j
+public abstract class AbstractParquetDataWriterBuilder<S,D> extends FsDataWriterBuilder<S, D> {
+
+  @Override
+  public DataWriter<D> build()
+      throws IOException {
+    Preconditions.checkNotNull(this.destination);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+    Preconditions.checkNotNull(this.schema);
+    Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
+
+    switch (this.destination.getType()) {
+      case HDFS:
+        return new ParquetHdfsDataWriter<D>(this, this.destination.getProperties());
+      default:
+        throw new RuntimeException("Unknown destination type: " + this.destination.getType());
+    }
+  }
+
+  protected abstract ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
+      throws IOException;
+
+  /**
+   * Build a {@link ParquetWriterShim <D>} for given file path with a block size.
+   * @param blockSize
+   * @param stagingFile
+   * @return
+   * @throws IOException
+   */
+  public ParquetWriterShim<D> getWriter(int blockSize, Path stagingFile)
+      throws IOException {
+    State state = this.destination.getProperties();
+    ParquetWriterConfiguration writerConfiguration =
+        new ParquetWriterConfiguration(state, this.getBranches(), this.getBranch(), stagingFile, blockSize);
+
+    log.info("Parquet writer configured with {}", writerConfiguration);
+    return getVersionSpecificWriter(writerConfiguration);
+  }
+
+}
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
similarity index 70%
rename from gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
rename to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
index 744c784..47d5624 100644
--- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetHdfsDataWriter.java
@@ -14,40 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.writer;
+package org.apache.gobblin.parquet.writer;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
-import parquet.example.data.Group;
-import parquet.hadoop.ParquetWriter;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.writer.FsDataWriter;
 
 
 /**
- * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
+ * An extension to {@link FsDataWriter} that writes in Parquet formats.
  *
  * <p>
- *   This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
- *   property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
+ *   This implementation allows users to specify different formats and codecs
+ *   through {@link ParquetWriterConfiguration} to write data.
  * </p>
  *
  * @author tilakpatidar
  */
-public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
-  private final ParquetWriter<Group> writer;
+public class ParquetHdfsDataWriter<D> extends FsDataWriter<D> {
+  private final ParquetWriterShim writer;
   protected final AtomicLong count = new AtomicLong(0);
 
-  public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
+  public ParquetHdfsDataWriter(AbstractParquetDataWriterBuilder builder, State state)
       throws IOException {
     super(builder, state);
     this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
   }
 
   @Override
-  public void write(Group record)
+  public void write(D record)
       throws IOException {
     this.writer.write(record);
     this.count.incrementAndGet();
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
similarity index 75%
copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
index eb07977..4403f03 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetRecordFormat.java
@@ -14,21 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.test;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
-
+package org.apache.gobblin.parquet.writer;
 
 /**
- * A Test record
+ * Enum to hold the supported values for formats supported by the Parquet writer
+ * @see {@link ParquetWriterConfiguration} for configuration keys to set them
  */
-@Getter
-@ToString
-@AllArgsConstructor
-public class TestRecord {
-  private int partition;
-  private long sequence;
-  private String payload;
+public enum ParquetRecordFormat {
+    GROUP,
+    AVRO,
+    PROTOBUF;
+
 }
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java
new file mode 100644
index 0000000..aeb426f
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ForkOperatorUtils;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
+
+
+/**
+ * Holds configuration for the {@link ParquetHdfsDataWriter}
+ */
+@Getter @ToString
+public class ParquetWriterConfiguration {
+  public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
+  public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
+  public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
+  public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
+  public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
+  public static final String DEFAULT_PARQUET_WRITER = "v1";
+  public static final String WRITER_PARQUET_FORMAT = WRITER_PREFIX + ".parquet.format";
+  public static final String DEFAULT_PARQUET_FORMAT = "group";
+
+
+
+  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+  public static final String DEFAULT_COMPRESSION_CODEC_NAME = "UNCOMPRESSED";
+  public static final String[] ALLOWED_COMPRESSION_CODECS = {"SNAPPY", "LZO", "UNCOMPRESSED", "GZIP"};
+  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+  public static final String DEFAULT_WRITER_VERSION = "v1";
+  public static final String[] ALLOWED_WRITER_VERSIONS = {"v1", "v2"};
+
+
+  private final int pageSize;
+  private final int dictPageSize;
+  private final boolean dictionaryEnabled;
+  private final boolean validate;
+  private final String writerVersion;
+  private final ParquetRecordFormat recordFormat;
+
+
+  private final int numBranches;
+  private final int branchId;
+  private final String codecName;
+  private final Path absoluteStagingFile;
+  private final int blockSize;
+
+  public ParquetWriterConfiguration(State state, int numBranches, int branchId, Path stagingFile, int blockSize) {
+    this(ConfigUtils.propertiesToConfig(state.getProperties()), numBranches, branchId, stagingFile, blockSize);
+  }
+
+
+  private String getProperty(String key) {
+    return ForkOperatorUtils.getPropertyNameForBranch(key, numBranches, branchId);
+  }
+
+  public static ParquetRecordFormat getRecordFormatFromConfig(Config config) {
+    String writeSupport = ConfigUtils.getString(config, WRITER_PARQUET_FORMAT, DEFAULT_PARQUET_FORMAT);
+    ParquetRecordFormat recordFormat = ParquetRecordFormat.valueOf(writeSupport.toUpperCase());
+    return recordFormat;
+  }
+
+
+  ParquetWriterConfiguration(Config config, int numBranches, int branchId, Path stagingFile, int blockSize) {
+    this.numBranches = numBranches;
+    this.branchId = branchId;
+    this.pageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
+    this.dictPageSize = ConfigUtils.getInt(config, getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
+    this.dictionaryEnabled =
+        ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
+    this.validate = ConfigUtils.getBoolean(config, getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
+    String rootURI = ConfigUtils.getString(config, WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
+    this.absoluteStagingFile = new Path(rootURI, stagingFile);
+    this.codecName = ConfigUtils.getString(config,getProperty(WRITER_CODEC_TYPE), DEFAULT_COMPRESSION_CODEC_NAME);
+    this.recordFormat = getRecordFormatFromConfig(config);
+    this.writerVersion = ConfigUtils.getString(config, getProperty(WRITER_PARQUET_VERSION), DEFAULT_WRITER_VERSION);
+    this.blockSize = blockSize;
+  }
+}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
similarity index 71%
copy from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
copy to gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
index eb07977..9e64fda 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/ParquetWriterShim.java
@@ -14,21 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.test;
+package org.apache.gobblin.parquet.writer;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
+import java.io.Closeable;
+import java.io.IOException;
 
 
 /**
- * A Test record
+ * An interface to shield gobblin-parquet-common integration from different parquet version specific interfaces
+ * @param <D>
  */
-@Getter
-@ToString
-@AllArgsConstructor
-public class TestRecord {
-  private int partition;
-  private long sequence;
-  private String payload;
+public interface ParquetWriterShim<D> extends Closeable {
+  void write(D record)
+      throws IOException;
 }
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java
new file mode 100644
index 0000000..10fc1a8
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/ParquetHdfsDataWriterTestBase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.Assert;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.Destination;
+import org.apache.gobblin.writer.WriterOutputFormat;
+
+
+/**
+ * Base class for building version-specific tests for Parquet
+ */
+@Slf4j
+public abstract class ParquetHdfsDataWriterTestBase {
+
+  public ParquetHdfsDataWriterTestBase(TestConstantsBase testConstants)
+  {
+    this.testConstants = testConstants;
+  }
+
+  private final TestConstantsBase testConstants;
+  private String filePath;
+  private DataWriter writer;
+
+  protected abstract DataWriterBuilder getDataWriterBuilder();
+
+  public void setUp()
+      throws Exception {
+    // Making the staging and/or output dirs if necessary
+    File stagingDir = new File(this.testConstants.TEST_STAGING_DIR);
+    File outputDir = new File(this.testConstants.TEST_OUTPUT_DIR);
+    if (!stagingDir.exists()) {
+      boolean mkdirs = stagingDir.mkdirs();
+      assert mkdirs;
+    }
+    if (!outputDir.exists()) {
+      boolean mkdirs = outputDir.mkdirs();
+      assert mkdirs;
+    }
+    this.filePath = getFilePath();
+  }
+
+  private String getFilePath() {
+    return TestConstantsBase.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstantsBase.TEST_EXTRACT_TABLE + "/"
+        + TestConstantsBase.TEST_EXTRACT_ID + "_" + TestConstantsBase.TEST_EXTRACT_PULL_TYPE;
+  }
+
+  private State createStateWithConfig(ParquetRecordFormat format) {
+    State properties = new State();
+    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstantsBase.TEST_FS_URI);
+    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstantsBase.TEST_STAGING_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstantsBase.TEST_OUTPUT_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, this.testConstants.getParquetTestFilename(format.name()));
+    properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY, true);
+    properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
+    properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_PAGE_SIZE, 1024);
+    properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_VALIDATE, true);
+    properties.setProp(ParquetWriterConfiguration.WRITER_PARQUET_FORMAT, format.toString());
+    properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
+    return properties;
+  }
+
+
+  protected abstract List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format) throws IOException;
+
+
+  public void testWrite()
+      throws Exception {
+    ParquetRecordFormat[] formats = ParquetRecordFormat.values();
+    for (ParquetRecordFormat format : formats) {
+      State formatSpecificProperties = createStateWithConfig(format);
+
+      this.writer = getDataWriterBuilder()
+          .writeTo(Destination.of(Destination.DestinationType.HDFS, formatSpecificProperties))
+          .withWriterId(TestConstantsBase.TEST_WRITER_ID)
+          .writeInFormat(WriterOutputFormat.PARQUET)
+          .withSchema(getSchema(format))
+          .build();
+
+      for (int i=0; i < 2; ++i) {
+        Object record = this.testConstants.getRecord(i, format);
+        this.writer.write(record);
+        Assert.assertEquals(i+1, this.writer.recordsWritten());
+      }
+      this.writer.close();
+      this.writer.commit();
+
+      String filePath = TestConstantsBase.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
+      File outputFile = new File(filePath, this.testConstants.getParquetTestFilename(format.name()));
+
+      List<TestRecord> records = readParquetRecordsFromFile(outputFile, format);
+      for (int i = 0; i < 2; ++i) {
+        TestRecord resultRecord = records.get(i);
+        log.debug("Testing {} record {}", i, resultRecord);
+        Assert.assertEquals(TestConstantsBase.getPayloadValues()[i], resultRecord.getPayload());
+        Assert.assertEquals(TestConstantsBase.getSequenceValues()[i], resultRecord.getSequence());
+        Assert.assertEquals(TestConstantsBase.getPartitionValues()[i], resultRecord.getPartition());
+      }
+    }
+  }
+
+  protected abstract Object getSchema(ParquetRecordFormat format);
+
+  public void tearDown()
+      throws IOException {
+    // Clean up the staging and/or output directories if necessary
+    File testRootDir = new File(TestConstantsBase.TEST_ROOT_DIR);
+    if (testRootDir.exists()) {
+      FileUtil.fullyDelete(testRootDir);
+    }
+  }
+
+}
diff --git a/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java
new file mode 100644
index 0000000..3326022
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet-common/src/main/java/org/apache/gobblin/parquet/writer/test/TestConstantsBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.parquet.writer.test;
+
+import java.util.Arrays;
+
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.protobuf.Message;
+
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
+
+
+/**
+ * Holder for TestConstantsBase
+ * @param <ParquetGroup> : the class that implements ParquetGroup, generic to allow package-specific overrides
+ */
+public abstract class TestConstantsBase<ParquetGroup> {
+
+  public static TestRecord[] getTestValues() {
+    return Arrays.copyOf(TEST_VALUES, TEST_VALUES.length);
+  }
+
+  public static String[] getPayloadValues() {
+    return Arrays.copyOf(PAYLOAD_VALUES, PAYLOAD_VALUES.length);
+  }
+
+  public static int[] getSequenceValues() {
+    return Arrays.copyOf(SEQUENCE_VALUES, SEQUENCE_VALUES.length);
+  }
+
+  public static int[] getPartitionValues() {
+    return Arrays.copyOf(PARTITION_VALUES, PARTITION_VALUES.length);
+  }
+
+  public final String getParquetTestFilename(String format) { return "test-"+format+".parquet"; };
+
+  public static final String TEST_FS_URI = "file:///";
+
+  public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
+
+  public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
+
+  public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+
+  public static final String TEST_WRITER_ID = "writer-1";
+
+  public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
+
+  public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
+
+  public static final String TEST_EXTRACT_TABLE = "TestTable";
+
+  public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
+
+
+  private static final TestRecord[] TEST_VALUES = new TestRecord[2];
+  public static final String PAYLOAD_FIELD_NAME = "payload";
+  public static final String SEQUENCE_FIELD_NAME = "sequence";
+  public static final String PARTITION_FIELD_NAME = "partition";
+  private static final String[] PAYLOAD_VALUES = {"value1", "value2"};
+  private static final int[] SEQUENCE_VALUES = {1, 2};
+  private static final int[] PARTITION_VALUES = {0, 1};
+  static {
+    for (int i=0; i < 2; ++i) {
+      TestRecord record = new TestRecord(getPartitionValues()[i],
+          getSequenceValues()[i],
+          getPayloadValues()[i]);
+      TEST_VALUES[i] = record;
+    }
+  }
+
+
+  public Object getRecord(int index, ParquetRecordFormat format) {
+    switch (format) {
+      case GROUP: {
+        return convertToParquetGroup(getTestValues()[index]);
+      }
+      case PROTOBUF: {
+        return getProtobufMessage(getTestValues()[index]);
+      }
+      case AVRO: {
+        return getAvroMessage(getTestValues()[index]);
+      }
+      default: {
+        throw new RuntimeException("Not understanding format " + format);
+      }
+    }
+  }
+
+
+  public abstract ParquetGroup convertToParquetGroup(TestRecord record);
+
+
+  private Message getProtobufMessage(TestRecord testValue) {
+    return TestRecordProtos.TestRecord.newBuilder()
+        .setPayload(testValue.getPayload())
+        .setPartition(testValue.getPartition())
+        .setSequence(testValue.getSequence())
+        .build();
+  }
+
+  private GenericRecord getAvroMessage(TestRecord record) {
+    org.apache.gobblin.test.avro.TestRecord testRecord = new org.apache.gobblin.test.avro.TestRecord();
+    testRecord.setPayload(record.getPayload());
+    testRecord.setPartition(record.getPartition());
+    testRecord.setSequence(record.getSequence());
+    return testRecord;
+  }
+
+
+
+}
diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle
index fb56bb1..cefd633 100644
--- a/gobblin-modules/gobblin-parquet/build.gradle
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -23,6 +23,8 @@ dependencies {
 
   compile externalDependency.gson
   compile externalDependency.twitterParquet
+  compile externalDependency.twitterParquetAvro
+  compile externalDependency.twitterParquetProto
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
index 4a47792..a96e079 100644
--- a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -17,97 +17,101 @@
 package org.apache.gobblin.writer;
 
 import java.io.IOException;
-import java.util.Optional;
 
+import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import com.google.protobuf.Message;
 
+import lombok.extern.slf4j.Slf4j;
+import parquet.avro.AvroParquetWriter;
 import parquet.column.ParquetProperties;
 import parquet.example.data.Group;
 import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.api.WriteSupport;
 import parquet.hadoop.example.GroupWriteSupport;
 import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.proto.ProtoParquetWriter;
 import parquet.schema.MessageType;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.parquet.writer.AbstractParquetDataWriterBuilder;
+import org.apache.gobblin.parquet.writer.ParquetWriterConfiguration;
+import org.apache.gobblin.parquet.writer.ParquetWriterShim;
 
-import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
-import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
-import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
-import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-
-
-public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
-  public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
-  public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
-  public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
-  public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
-  public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
-  public static final String DEFAULT_PARQUET_WRITER = "v1";
-
-  @Override
-  public DataWriter<Group> build()
-      throws IOException {
-    Preconditions.checkNotNull(this.destination);
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
-    Preconditions.checkNotNull(this.schema);
-    Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
-
-    switch (this.destination.getType()) {
-      case HDFS:
-        return new ParquetHdfsDataWriter(this, this.destination.getProperties());
-      default:
-        throw new RuntimeException("Unknown destination type: " + this.destination.getType());
-    }
-  }
+@Slf4j
+public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {
 
   /**
-   * Build a {@link ParquetWriter<Group>} for given file path with a block size.
-   * @param blockSize
-   * @param stagingFile
+   * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
+   * @param writerConfiguration
    * @return
    * @throws IOException
    */
-  public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+  @Override
+  public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
       throws IOException {
-    State state = this.destination.getProperties();
-    int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
-    int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
-    boolean enableDictionary =
-        state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
-    boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
-    String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
-    Path absoluteStagingFile = new Path(rootURI, stagingFile);
-    CompressionCodecName codec = getCodecFromConfig();
-    GroupWriteSupport support = new GroupWriteSupport();
-    Configuration conf = new Configuration();
-    GroupWriteSupport.setSchema(this.schema, conf);
-    ParquetProperties.WriterVersion writerVersion = getWriterVersion();
-    return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
-        validate, writerVersion, conf);
-  }
 
-  private ParquetProperties.WriterVersion getWriterVersion() {
-    return ParquetProperties.WriterVersion.fromString(
-        this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
-  }
+    CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
+    ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
+        .fromString(writerConfiguration.getWriterVersion());
 
-  private CompressionCodecName getCodecFromConfig() {
-    State state = this.destination.getProperties();
-    String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
-        .orElse(CompressionCodecName.SNAPPY.toString());
-    return CompressionCodecName.valueOf(codecValue.toUpperCase());
-  }
+    Configuration conf = new Configuration();
+    ParquetWriter versionSpecificWriter = null;
+    switch (writerConfiguration.getRecordFormat()) {
+      case GROUP: {
+        GroupWriteSupport.setSchema((MessageType) this.schema, conf);
+        WriteSupport support = new GroupWriteSupport();
+        versionSpecificWriter = new ParquetWriter<Group>(
+            writerConfiguration.getAbsoluteStagingFile(),
+            support,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.getDictPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            writerConfiguration.isValidate(),
+            writerVersion,
+            conf);
+        break;
+      }
+      case AVRO:  {
+        versionSpecificWriter = new AvroParquetWriter(
+            writerConfiguration.getAbsoluteStagingFile(),
+            (Schema) this.schema,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            conf);
+        break;
+      }
+      case PROTOBUF: {
+        versionSpecificWriter = new ProtoParquetWriter(
+            writerConfiguration.getAbsoluteStagingFile(),
+            (Class<? extends Message>) this.schema,
+            codecName,
+            writerConfiguration.getBlockSize(),
+            writerConfiguration.getPageSize(),
+            writerConfiguration.isDictionaryEnabled(),
+            writerConfiguration.isValidate());
+        break;
+      }
+      default: throw new RuntimeException("Record format not supported");
+    }
+    ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;
+
+    return new ParquetWriterShim() {
+      @Override
+      public void write(Object record)
+          throws IOException {
+        finalVersionSpecificWriter.write(record);
+      }
 
-  private String getProperty(String key) {
-    return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+      @Override
+      public void close()
+          throws IOException {
+        finalVersionSpecificWriter.close();
+      }
+    };
   }
-}
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
index 086b084..0f0aadb 100644
--- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -21,91 +21,115 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import parquet.avro.AvroParquetReader;
 import parquet.example.data.Group;
 import parquet.example.data.simple.convert.GroupRecordConverter;
 import parquet.hadoop.ParquetReader;
 import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
 import parquet.io.api.RecordMaterializer;
+import parquet.proto.ProtoParquetReader;
 import parquet.schema.MessageType;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
-import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+import org.apache.gobblin.parquet.writer.ParquetRecordFormat;
+import org.apache.gobblin.parquet.writer.test.ParquetHdfsDataWriterTestBase;
+import org.apache.gobblin.test.TestRecord;
+import org.apache.gobblin.test.proto.TestRecordProtos;
 
 
 @Test(groups = {"gobblin.writer"})
-public class ParquetHdfsDataWriterTest {
+public class ParquetHdfsDataWriterTest extends ParquetHdfsDataWriterTestBase {
 
-  private MessageType schema;
-  private String filePath;
-  private ParquetHdfsDataWriter writer;
-  private State properties;
+  public ParquetHdfsDataWriterTest() {
+    super(new TestConstants());
+  }
 
   @BeforeMethod
   public void setUp()
       throws Exception {
-    // Making the staging and/or output dirs if necessary
-    File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
-    File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
-    if (!stagingDir.exists()) {
-      boolean mkdirs = stagingDir.mkdirs();
-      assert mkdirs;
-    }
-    if (!outputDir.exists()) {
-      boolean mkdirs = outputDir.mkdirs();
-      assert mkdirs;
-    }
-    this.schema = TestConstants.PARQUET_SCHEMA;
-    this.filePath = getFilePath();
-    this.properties = createStateWithConfig();
-    this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+    super.setUp();
   }
 
-  private String getFilePath() {
-    return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
-        + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+  protected DataWriterBuilder getDataWriterBuilder() {
+    return new ParquetDataWriterBuilder();
   }
 
-  private State createStateWithConfig() {
-    State properties = new State();
-    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
-    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
-    properties.setProp(WRITER_PARQUET_DICTIONARY, true);
-    properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
-    properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
-    properties.setProp(WRITER_PARQUET_VALIDATE, true);
-    properties.setProp(ConfigurationKeys.WRITER_CODEC_TYPE, "gzip");
-    return properties;
+  @Override
+  protected List<TestRecord> readParquetRecordsFromFile(File outputFile, ParquetRecordFormat format)
+      throws IOException {
+    switch (format) {
+      case GROUP: {
+        return readParquetFilesGroup(outputFile);
+      }
+      case PROTOBUF: {
+        return readParquetFilesProto(outputFile);
+      }
+      case AVRO: {
+        return readParquetFilesAvro(outputFile);
+      }
+      default: throw new RuntimeException(format + " is not supported");
+    }
   }
 
-  private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
-    ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
-    writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
-    writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
-    writerBuilder.schema = this.schema;
-    writerBuilder.format = WriterOutputFormat.PARQUET;
-    return writerBuilder;
+  private List<TestRecord> readParquetFilesAvro(File outputFile)
+      throws IOException {
+    ParquetReader<org.apache.gobblin.test.avro.TestRecord> reader = null;
+    List<TestRecord> records = new ArrayList<>();
+    try {
+      reader = new AvroParquetReader<>(new Path(outputFile.toString()));
+      for (org.apache.gobblin.test.avro.TestRecord value = reader.read(); value != null; value = reader.read()) {
+        records.add(new TestRecord(value.getPartition(),
+            value.getSequence(),
+            value.getPayload()));
+      }
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (Exception ex) {
+          System.out.println(ex.getMessage());
+        }
+      }
+    }
+    return records;
+
   }
 
-  private List<Group> readParquetFiles(File outputFile)
+
+  protected List<TestRecord> readParquetFilesProto(File outputFile)
+      throws IOException {
+    ParquetReader<TestRecordProtos.TestRecordOrBuilder> reader = null;
+    List<TestRecord> records = new ArrayList<>();
+    try {
+      reader = new ProtoParquetReader<>(new Path(outputFile.toString()));
+      TestRecordProtos.TestRecordOrBuilder value = reader.read();
+      while (value!= null) {
+        records.add(new TestRecord(value.getPartition(),
+            value.getSequence(),
+            value.getPayload()));
+        value = reader.read();
+      }
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (Exception ex) {
+          System.out.println(ex.getMessage());
+        }
+      }
+    }
+    return records;
+  }
+
+  protected List<TestRecord> readParquetFilesGroup(File outputFile)
       throws IOException {
     ParquetReader<Group> reader = null;
     List<Group> records = new ArrayList<>();
@@ -123,46 +147,42 @@ public class ParquetHdfsDataWriterTest {
         }
       }
     }
-    return records;
+    return records.stream().map(value -> new TestRecord(
+        value.getInteger(TestConstants.PARTITION_FIELD_NAME, 0),
+        value.getInteger(TestConstants.SEQUENCE_FIELD_NAME, 0),
+        value.getString(TestConstants.PAYLOAD_FIELD_NAME, 0)
+    )).collect(Collectors.toList());
   }
 
+
   @Test
   public void testWrite()
       throws Exception {
-    long firstWrite;
-    long secondWrite;
-    List<Group> records;
-    Group record1 = TestConstants.PARQUET_RECORD_1;
-    Group record2 = TestConstants.PARQUET_RECORD_2;
-    String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
-    File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
-
-    this.writer.write(record1);
-    firstWrite = this.writer.recordsWritten();
-    this.writer.write(record2);
-    secondWrite = this.writer.recordsWritten();
-    this.writer.close();
-    this.writer.commit();
-    records = readParquetFiles(outputFile);
-    Group resultRecord1 = records.get(0);
-    Group resultRecord2 = records.get(1);
-
-    Assert.assertEquals(firstWrite, 1);
-    Assert.assertEquals(secondWrite, 2);
-    Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
-    Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
-    Assert.assertEquals(resultRecord2.getString("name", 0), "other");
-    Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+    super.testWrite();
+  }
+
+  @Override
+  protected Object getSchema(ParquetRecordFormat format) {
+    switch (format) {
+      case GROUP: {
+        return TestConstants.PARQUET_SCHEMA;
+      }
+      case PROTOBUF: {
+        return TestRecordProtos.TestRecord.class;
+      }
+      case AVRO: {
+        return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+      }
+      default:
+        throw new RuntimeException(format.name() + " is not implemented");
+    }
   }
 
+
   @AfterClass
   public void tearDown()
       throws IOException {
-    // Clean up the staging and/or output directories if necessary
-    File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
-    if (testRootDir.exists()) {
-      FileUtil.fullyDelete(testRootDir);
-    }
+    super.tearDown();
   }
 
   class SimpleReadSupport extends ReadSupport<Group> {
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
index e5bf215..6eb58dc 100644
--- a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -23,40 +23,28 @@ import parquet.schema.OriginalType;
 import parquet.schema.PrimitiveType;
 import parquet.schema.Types;
 
+import org.apache.gobblin.parquet.writer.test.TestConstantsBase;
+import org.apache.gobblin.test.TestRecord;
 
-public class TestConstants {
-  public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
-      .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
-          Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
-
-  public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
-
-  public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
-
-  public static final String PARQUET_TEST_FILENAME = "test.parquet";
-
-  public static final String TEST_FS_URI = "file:///";
-
-  public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir") + "/" + System.currentTimeMillis();
-
-  public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
 
-  public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+public class TestConstants extends TestConstantsBase<Group> {
 
-  public static final String TEST_WRITER_ID = "writer-1";
-
-  public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
-
-  public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
-
-  public static final String TEST_EXTRACT_TABLE = "TestTable";
-
-  public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
-
-  static {
-    PARQUET_RECORD_1.add("name", "tilak");
-    PARQUET_RECORD_1.add("age", 22);
-    PARQUET_RECORD_2.add("name", "other");
-    PARQUET_RECORD_2.add("age", 22);
+  public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+      .addFields(
+          Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+              .named(TestConstants.PAYLOAD_FIELD_NAME),
+          Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.PARTITION_FIELD_NAME),
+          // Sequence field is INT32 instead of INT64, because this version of parquet only supports INT32
+          Types.required(PrimitiveType.PrimitiveTypeName.INT32).named(TestConstants.SEQUENCE_FIELD_NAME))
+      .named("Data");
+
+  @Override
+  public Group convertToParquetGroup(TestRecord record) {
+    Group group = new SimpleGroup(PARQUET_SCHEMA);
+    group.add(PAYLOAD_FIELD_NAME, record.getPayload());
+    group.add(SEQUENCE_FIELD_NAME, Long.valueOf(record.getSequence()).intValue());
+    group.add(PARTITION_FIELD_NAME, record.getPartition());
+    return group;
   }
-}
+
+}
\ No newline at end of file
diff --git a/gobblin-test-utils/build.gradle b/gobblin-test-utils/build.gradle
index 3c42427..2402ac7 100644
--- a/gobblin-test-utils/build.gradle
+++ b/gobblin-test-utils/build.gradle
@@ -14,6 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+plugins {
+  id "com.google.protobuf" version "0.8.8"
+  id "com.commercehub.gradle.plugin.avro-base" version "0.9.0"
+}
 
 apply plugin: 'java'
 
@@ -28,13 +32,72 @@ dependencies {
   compile externalDependency.lombok
   compile externalDependency.typesafeConfig
   compile externalDependency.findBugsAnnotations
+  compile externalDependency.protobuf
+
 
   testCompile externalDependency.testng
   testCompile externalDependency.mockito
 }
 
+sourceSets {
+  main {
+    java {
+      srcDir 'src/main/gen-proto'
+      srcDir 'src/main/gen-avro'
+    }
+    resources {
+      srcDir 'src/main/proto'
+      srcDir 'src/main/avro'
+    }
+
+  }
+}
+
+/**
+clean.doFirst {
+  delete file("${projectDir}/src/main/gen-proto/").listFiles()
+  delete file("${projectDir}/src/main/gen-avro/").listFiles()
+}
+**/
+
 test {
   workingDir rootProject.rootDir
 }
 
+protobuf {
+  generatedFilesBaseDir = "$projectDir/src/main/gen-proto"
+  protoc {
+    artifact = "com.google.protobuf:protoc:3.6.1"
+  }
+  plugins {
+    grpc {
+      artifact = "io.grpc:protoc-gen-grpc-java:1.19.0"
+    }
+  }
+  generateProtoTasks {
+    all()*.plugins {
+      grpc {}
+    }
+  }
+}
+
+avro {
+  stringType = "string"
+}
+
+task generateAvro(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
+  source("src/main/avro")
+  outputDir = file("src/main/gen-avro")
+}
+
+compileJava {
+  dependsOn tasks.generateAvro
+}
+
+checkstyleMain.source = ['src/main/java','src/test/java']
+
+findbugs {
+  excludeFilter = file("buildConfig/findbugs-exclude-filter.xml")
+}
+
 ext.classification="library"
diff --git a/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml
new file mode 100644
index 0000000..61b34c0
--- /dev/null
+++ b/gobblin-test-utils/buildConfig/findbugs-exclude-filter.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<FindBugsFilter>
+    <Match>
+        <Package name="~org\.apache\.gobblin\.test\.proto.*"/>
+    </Match>
+</FindBugsFilter>
diff --git a/gobblin-test-utils/src/main/avro/TestRecord.avsc b/gobblin-test-utils/src/main/avro/TestRecord.avsc
new file mode 100644
index 0000000..0a5d520
--- /dev/null
+++ b/gobblin-test-utils/src/main/avro/TestRecord.avsc
@@ -0,0 +1,23 @@
+{
+  "type": "record",
+  "name": "TestRecord",
+  "namespace" : "org.apache.gobblin.test.avro",
+  "fields": [
+    {
+      "name": "partition",
+      "type": "int",
+      "doc": "Partition id for the record"
+    },
+    {
+      "name": "sequence",
+      "type": "long",
+      "doc": "The sequence number"
+    },
+    {
+      "name": "payload",
+      "type": "string",
+      "default" : "",
+      "doc": "Human-readable payload"
+    }
+  ]
+}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java
similarity index 100%
rename from gobblin-core-base/src/main/java/org/apache/gobblin/test/TestRecord.java
rename to gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestRecord.java
diff --git a/gobblin-test-utils/src/main/proto/TestRecord.proto b/gobblin-test-utils/src/main/proto/TestRecord.proto
new file mode 100644
index 0000000..53bafbd
--- /dev/null
+++ b/gobblin-test-utils/src/main/proto/TestRecord.proto
@@ -0,0 +1,11 @@
+syntax = "proto2";
+
+option java_package = "org.apache.gobblin.test.proto";
+option java_outer_classname = "TestRecordProtos";
+
+
+message TestRecord {
+  required int32 partition = 1;
+  required int64 sequence = 2;
+  required string payload = 3;
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index f2836f7..7c83a79 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -130,7 +130,7 @@ ext.externalDependency = [
     "curatorTest": "org.apache.curator:curator-test:2.10.0",
     "hamcrest": "org.hamcrest:hamcrest-all:1.3",
     "joptSimple": "net.sf.jopt-simple:jopt-simple:4.9",
-    "protobuf": "com.google.protobuf:protobuf-java:2.5.0",
+    "protobuf": "com.google.protobuf:protobuf-java:3.6.1",
     "pegasus" : [
         "data" : "com.linkedin.pegasus:data:" + pegasusVersion,
         "generator" : "com.linkedin.pegasus:generator:" + pegasusVersion,
@@ -169,7 +169,11 @@ ext.externalDependency = [
     "hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
     "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.1",
     'parquet': 'org.apache.parquet:parquet-hadoop:1.10.1',
+    'parquetAvro': 'org.apache.parquet:parquet-avro:1.10.1',
+    'parquetProto': 'org.apache.parquet:parquet-protobuf:1.10.1',
     'twitterParquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
+    'twitterParquetAvro': 'com.twitter:parquet-avro:1.5.0',
+    'twitterParquetProto': 'com.twitter:parquet-protobuf:1.5.0',
     'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
     "slf4j": [
         "org.slf4j:slf4j-api:" + slf4jVersion,
@@ -182,6 +186,7 @@ ext.externalDependency = [
     ],
     "postgresConnector": "org.postgresql:postgresql:42.1.4",
     "assertj": 'org.assertj:assertj-core:3.8.0',
+    "protobuf": 'com.google.protobuf:protobuf-java:3.6.1',
 ]
 
 if (!isDefaultEnvironment)