You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/10/05 19:11:33 UTC

incubator-gobblin git commit: [GOBBLIN-255] ParquetHdfsDataWriter

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8af87cb78 -> 312e768f5


[GOBBLIN-255] ParquetHdfsDataWriter

Closes #2106 from tilakpatidar/parquet_writer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/312e768f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/312e768f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/312e768f

Branch: refs/heads/master
Commit: 312e768f564e7cb4619c7986cfdf9b0f828bbc7b
Parents: 8af87cb
Author: tilakpatidar <ti...@gmail.com>
Authored: Thu Oct 5 12:11:23 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Thu Oct 5 12:11:23 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/writer/TestConstants.java    |  14 +-
 .../Configuration-Properties-Glossary.md        |  35 ++++
 gobblin-modules/gobblin-parquet/build.gradle    |  43 +++++
 .../writer/ParquetDataWriterBuilder.java        | 112 ++++++++++++
 .../gobblin/writer/ParquetHdfsDataWriter.java   |  70 ++++++++
 .../writer/ParquetHdfsDataWriterTest.java       | 178 +++++++++++++++++++
 .../apache/gobblin/writer/TestConstants.java    |  62 +++++++
 7 files changed, 505 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 8af13b8..ce0e9ab 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -25,15 +25,11 @@ package org.apache.gobblin.writer;
 public class TestConstants {
 
   // Test Avro schema
-  public static final String AVRO_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
-      " \"type\": \"record\",\n" +
-      " \"name\": \"User\",\n" +
-      " \"fields\": [\n" +
-      "     {\"name\": \"name\", \"type\": \"string\"},\n" +
-      "     {\"name\": \"favorite_number\",  \"type\": \"int\"},\n" +
-      "     {\"name\": \"favorite_color\", \"type\": \"string\"}\n" +
-      " ]\n" +
-      "}";
+  public static final String AVRO_SCHEMA =
+      "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"User\",\n" + " \"fields\": [\n"
+          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"favorite_number\",  \"type\": \"int\"},\n"
+          + "     {\"name\": \"favorite_color\", \"type\": \"string\"}\n" + " ]\n" + "}";
 
   // Test Avro data in json format
   public static final String[] JSON_RECORDS =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
----------------------------------------------------------------------
diff --git a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
index e275ca2..363d873 100644
--- a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
+++ b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
@@ -1059,6 +1059,41 @@ This is used to control the writer creation. If the value is set to true, writer
 False 
 ###### Required
 No
+#### writer.parquet.page.size
+###### Description
+The page size threshold
+###### Default Value
+1048576
+###### Required
+No
+#### writer.parquet.dictionary.page.size
+###### Description
+The block size threshold.
+###### Default Value
+134217728
+###### Required
+No
+#### writer.parquet.dictionary
+###### Description
+To turn dictionary encoding on.
+###### Default Value
+true
+###### Required
+No
+#### writer.parquet.validate
+###### Description
+To turn on validation using the schema.
+###### Default Value
+false
+###### Required
+No
+#### writer.parquet.version
+###### Description
+Version of parquet writer to use. Available versions are v1 and v2.
+###### Default Value
+v1
+###### Required
+No
 # Data Publisher Properties <a name="Data-Publisher-Properties"></a>
 #### data.publisher.type 
 ###### Description

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle
new file mode 100644
index 0000000..e43f543
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-core")
+
+  compile externalDependency.parquet
+
+  testCompile externalDependency.testng
+  testCompile externalDependency.mockito
+  testCompile externalDependency.mockRunnerJdbc
+}
+
+configurations {
+  compile { transitive = false }
+  // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+  // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+  // HADOOP-5254 and MAPREDUCE-5664
+  all*.exclude group: 'xml-apis'
+  all*.exclude group: 'xerces'
+}
+
+test {
+  workingDir rootProject.rootDir
+}
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
new file mode 100644
index 0000000..7ce2020
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import parquet.column.ParquetProperties;
+import parquet.example.data.Group;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
+import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
+import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
+import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+
+
+public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
+  public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
+  public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
+  public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
+  public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
+  public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
+  public static final String DEFAULT_PARQUET_WRITER = "v1";
+
+  @Override
+  public DataWriter<Group> build()
+      throws IOException {
+    Preconditions.checkNotNull(this.destination);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+    Preconditions.checkNotNull(this.schema);
+    Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
+
+    switch (this.destination.getType()) {
+      case HDFS:
+        return new ParquetHdfsDataWriter(this, this.destination.getProperties());
+      default:
+        throw new RuntimeException("Unknown destination type: " + this.destination.getType());
+    }
+  }
+
+  /**
+   * Build a {@link ParquetWriter<Group>} for given file path with a block size.
+   * @param blockSize
+   * @param stagingFile
+   * @return
+   * @throws IOException
+   */
+  public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+      throws IOException {
+    State state = this.destination.getProperties();
+    int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
+    int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
+    boolean enableDictionary =
+        state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
+    boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
+    String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
+    Path absoluteStagingFile = new Path(rootURI, stagingFile);
+    CompressionCodecName codec = getCodecFromConfig();
+    GroupWriteSupport support = new GroupWriteSupport();
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(this.schema, conf);
+    ParquetProperties.WriterVersion writerVersion = getWriterVersion();
+    return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
+        validate, writerVersion, conf);
+  }
+
+  private ParquetProperties.WriterVersion getWriterVersion() {
+    return ParquetProperties.WriterVersion.fromString(
+        this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
+  }
+
+  private CompressionCodecName getCodecFromConfig() {
+    State state = this.destination.getProperties();
+    String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
+        .orElse(CompressionCodecName.SNAPPY.toString());
+    return CompressionCodecName.valueOf(codecValue.toUpperCase());
+  }
+
+  private String getProperty(String key) {
+    return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
new file mode 100644
index 0000000..a775bc2
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+import parquet.example.data.Group;
+import parquet.hadoop.ParquetWriter;
+
+
+/**
+ * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
+ *
+ * <p>
+ *   This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
+ *   property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
+ * </p>
+ *
+ * @author tilakpatidar
+ */
+public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
+  private final ParquetWriter<Group> writer;
+  protected final AtomicLong count = new AtomicLong(0);
+
+  public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
+      throws IOException {
+    super(builder, state);
+    this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
+  }
+
+  @Override
+  public void write(Group record)
+      throws IOException {
+    this.writer.write(record);
+    this.count.incrementAndGet();
+  }
+
+  @Override
+  public long recordsWritten() {
+    return this.count.get();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    try {
+      this.writer.close();
+    } finally {
+      super.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
new file mode 100644
index 0000000..40c638c
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.convert.GroupRecordConverter;
+import parquet.hadoop.ParquetReader;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+
+
+@Test(groups = {"gobblin.writer"})
+public class ParquetHdfsDataWriterTest {
+
+  private MessageType schema;
+  private String filePath;
+  private ParquetHdfsDataWriter writer;
+  private State properties;
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    // Making the staging and/or output dirs if necessary
+    File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
+    File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
+    if (!stagingDir.exists()) {
+      boolean mkdirs = stagingDir.mkdirs();
+      assert mkdirs;
+    }
+    if (!outputDir.exists()) {
+      boolean mkdirs = outputDir.mkdirs();
+      assert mkdirs;
+    }
+    this.schema = TestConstants.PARQUET_SCHEMA;
+    this.filePath = getFilePath();
+    this.properties = createStateWithConfig();
+    this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+  }
+
+  private String getFilePath() {
+    return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
+        + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+  }
+
+  private State createStateWithConfig() {
+    State properties = new State();
+    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
+    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
+    properties.setProp(WRITER_PARQUET_DICTIONARY, true);
+    properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
+    properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
+    properties.setProp(WRITER_PARQUET_VALIDATE, true);
+    return properties;
+  }
+
+  private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
+    ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
+    writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
+    writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
+    writerBuilder.schema = this.schema;
+    writerBuilder.format = WriterOutputFormat.PARQUET;
+    return writerBuilder;
+  }
+
+  private List<Group> readParquetFiles(File outputFile)
+      throws IOException {
+    ParquetReader<Group> reader = null;
+    List<Group> records = new ArrayList<>();
+    try {
+      reader = new ParquetReader<>(new Path(outputFile.toString()), new SimpleReadSupport());
+      for (Group value = reader.read(); value != null; value = reader.read()) {
+        records.add(value);
+      }
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (Exception ex) {
+          System.out.println(ex.getMessage());
+        }
+      }
+    }
+    return records;
+  }
+
+  @Test
+  public void testWrite()
+      throws Exception {
+    long firstWrite;
+    long secondWrite;
+    List<Group> records;
+    Group record1 = TestConstants.PARQUET_RECORD_1;
+    Group record2 = TestConstants.PARQUET_RECORD_2;
+    String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
+    File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
+
+    this.writer.write(record1);
+    firstWrite = this.writer.recordsWritten();
+    this.writer.write(record2);
+    secondWrite = this.writer.recordsWritten();
+    this.writer.close();
+    this.writer.commit();
+    records = readParquetFiles(outputFile);
+    Group resultRecord1 = records.get(0);
+    Group resultRecord2 = records.get(1);
+
+    Assert.assertEquals(firstWrite, 1);
+    Assert.assertEquals(secondWrite, 2);
+    Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
+    Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
+    Assert.assertEquals(resultRecord2.getString("name", 0), "other");
+    Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    // Clean up the staging and/or output directories if necessary
+    File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
+    if (testRootDir.exists()) {
+      FileUtil.fullyDelete(testRootDir);
+    }
+  }
+
+  class SimpleReadSupport extends ReadSupport<Group> {
+    @Override
+    public RecordMaterializer<Group> prepareForRead(Configuration conf, Map<String, String> metaData,
+        MessageType schema, ReadContext context) {
+      return new GroupRecordConverter(schema);
+    }
+
+    @Override
+    public ReadContext init(InitContext context) {
+      return new ReadContext(context.getFileSchema());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
new file mode 100644
index 0000000..6144aaf
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroup;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Types;
+
+
+public class TestConstants {
+  public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+      .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
+          Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
+
+  public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
+
+  public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
+
+  public static final String PARQUET_TEST_FILENAME = "test.parquet";
+
+  public static final String TEST_FS_URI = "file:///";
+
+  public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir");
+
+  public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
+
+  public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+
+  public static final String TEST_WRITER_ID = "writer-1";
+
+  public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
+
+  public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
+
+  public static final String TEST_EXTRACT_TABLE = "TestTable";
+
+  public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
+
+  static {
+    PARQUET_RECORD_1.add("name", "tilak");
+    PARQUET_RECORD_1.add("age", 22);
+    PARQUET_RECORD_2.add("name", "other");
+    PARQUET_RECORD_2.add("age", 22);
+  }
+}