You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/10/05 19:11:33 UTC
incubator-gobblin git commit: [GOBBLIN-255] ParquetHdfsDataWriter
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 8af87cb78 -> 312e768f5
[GOBBLIN-255] ParquetHdfsDataWriter
Closes #2106 from tilakpatidar/parquet_writer
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/312e768f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/312e768f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/312e768f
Branch: refs/heads/master
Commit: 312e768f564e7cb4619c7986cfdf9b0f828bbc7b
Parents: 8af87cb
Author: tilakpatidar <ti...@gmail.com>
Authored: Thu Oct 5 12:11:23 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Thu Oct 5 12:11:23 2017 -0700
----------------------------------------------------------------------
.../apache/gobblin/writer/TestConstants.java | 14 +-
.../Configuration-Properties-Glossary.md | 35 ++++
gobblin-modules/gobblin-parquet/build.gradle | 43 +++++
.../writer/ParquetDataWriterBuilder.java | 112 ++++++++++++
.../gobblin/writer/ParquetHdfsDataWriter.java | 70 ++++++++
.../writer/ParquetHdfsDataWriterTest.java | 178 +++++++++++++++++++
.../apache/gobblin/writer/TestConstants.java | 62 +++++++
7 files changed, 505 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
index 8af13b8..ce0e9ab 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -25,15 +25,11 @@ package org.apache.gobblin.writer;
public class TestConstants {
// Test Avro schema
- public static final String AVRO_SCHEMA = "{\"namespace\": \"example.avro\",\n" +
- " \"type\": \"record\",\n" +
- " \"name\": \"User\",\n" +
- " \"fields\": [\n" +
- " {\"name\": \"name\", \"type\": \"string\"},\n" +
- " {\"name\": \"favorite_number\", \"type\": \"int\"},\n" +
- " {\"name\": \"favorite_color\", \"type\": \"string\"}\n" +
- " ]\n" +
- "}";
+ public static final String AVRO_SCHEMA =
+ "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"User\",\n" + " \"fields\": [\n"
+ + " {\"name\": \"name\", \"type\": \"string\"},\n"
+ + " {\"name\": \"favorite_number\", \"type\": \"int\"},\n"
+ + " {\"name\": \"favorite_color\", \"type\": \"string\"}\n" + " ]\n" + "}";
// Test Avro data in json format
public static final String[] JSON_RECORDS =
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
----------------------------------------------------------------------
diff --git a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
index e275ca2..363d873 100644
--- a/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
+++ b/gobblin-docs/user-guide/Configuration-Properties-Glossary.md
@@ -1059,6 +1059,41 @@ This is used to control the writer creation. If the value is set to true, writer
False
###### Required
No
+#### writer.parquet.page.size
+###### Description
+The page size threshold
+###### Default Value
+1048576
+###### Required
+No
+#### writer.parquet.dictionary.page.size
+###### Description
+The block size threshold.
+###### Default Value
+134217728
+###### Required
+No
+#### writer.parquet.dictionary
+###### Description
+To turn dictionary encoding on.
+###### Default Value
+true
+###### Required
+No
+#### writer.parquet.validate
+###### Description
+To turn on validation using the schema.
+###### Default Value
+false
+###### Required
+No
+#### writer.parquet.version
+###### Description
+Version of parquet writer to use. Available versions are v1 and v2.
+###### Default Value
+v1
+###### Required
+No
# Data Publisher Properties <a name="Data-Publisher-Properties"></a>
#### data.publisher.type
###### Description
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle
new file mode 100644
index 0000000..e43f543
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/build.gradle
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-core")
+
+ compile externalDependency.parquet
+
+ testCompile externalDependency.testng
+ testCompile externalDependency.mockito
+ testCompile externalDependency.mockRunnerJdbc
+}
+
+configurations {
+ compile { transitive = false }
+ // Remove xerces dependencies because of versioning issues. Standard JRE implementation should
+ // work. See also http://stackoverflow.com/questions/11677572/dealing-with-xerces-hell-in-java-maven
+ // HADOOP-5254 and MAPREDUCE-5664
+ all*.exclude group: 'xml-apis'
+ all*.exclude group: 'xerces'
+}
+
+test {
+ workingDir rootProject.rootDir
+}
+
+ext.classification="library"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
new file mode 100644
index 0000000..7ce2020
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+import parquet.column.ParquetProperties;
+import parquet.example.data.Group;
+import parquet.hadoop.ParquetWriter;
+import parquet.hadoop.example.GroupWriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.LOCAL_FS_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_CODEC_TYPE;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_FILE_SYSTEM_URI;
+import static org.apache.gobblin.configuration.ConfigurationKeys.WRITER_PREFIX;
+import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static parquet.hadoop.ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED;
+import static parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED;
+import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
+
+
+public class ParquetDataWriterBuilder extends FsDataWriterBuilder<MessageType, Group> {
+ public static final String WRITER_PARQUET_PAGE_SIZE = WRITER_PREFIX + ".parquet.pageSize";
+ public static final String WRITER_PARQUET_DICTIONARY_PAGE_SIZE = WRITER_PREFIX + ".parquet.dictionaryPageSize";
+ public static final String WRITER_PARQUET_DICTIONARY = WRITER_PREFIX + ".parquet.dictionary";
+ public static final String WRITER_PARQUET_VALIDATE = WRITER_PREFIX + ".parquet.validate";
+ public static final String WRITER_PARQUET_VERSION = WRITER_PREFIX + ".parquet.version";
+ public static final String DEFAULT_PARQUET_WRITER = "v1";
+
+ @Override
+ public DataWriter<Group> build()
+ throws IOException {
+ Preconditions.checkNotNull(this.destination);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.writerId));
+ Preconditions.checkNotNull(this.schema);
+ Preconditions.checkArgument(this.format == WriterOutputFormat.PARQUET);
+
+ switch (this.destination.getType()) {
+ case HDFS:
+ return new ParquetHdfsDataWriter(this, this.destination.getProperties());
+ default:
+ throw new RuntimeException("Unknown destination type: " + this.destination.getType());
+ }
+ }
+
+ /**
+ * Build a {@link ParquetWriter<Group>} for given file path with a block size.
+ * @param blockSize
+ * @param stagingFile
+ * @return
+ * @throws IOException
+ */
+ public ParquetWriter<Group> getWriter(int blockSize, Path stagingFile)
+ throws IOException {
+ State state = this.destination.getProperties();
+ int pageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_PAGE_SIZE), DEFAULT_PAGE_SIZE);
+ int dictPageSize = state.getPropAsInt(getProperty(WRITER_PARQUET_DICTIONARY_PAGE_SIZE), DEFAULT_BLOCK_SIZE);
+ boolean enableDictionary =
+ state.getPropAsBoolean(getProperty(WRITER_PARQUET_DICTIONARY), DEFAULT_IS_DICTIONARY_ENABLED);
+ boolean validate = state.getPropAsBoolean(getProperty(WRITER_PARQUET_VALIDATE), DEFAULT_IS_VALIDATING_ENABLED);
+ String rootURI = state.getProp(WRITER_FILE_SYSTEM_URI, LOCAL_FS_URI);
+ Path absoluteStagingFile = new Path(rootURI, stagingFile);
+ CompressionCodecName codec = getCodecFromConfig();
+ GroupWriteSupport support = new GroupWriteSupport();
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(this.schema, conf);
+ ParquetProperties.WriterVersion writerVersion = getWriterVersion();
+ return new ParquetWriter<>(absoluteStagingFile, support, codec, blockSize, pageSize, dictPageSize, enableDictionary,
+ validate, writerVersion, conf);
+ }
+
+ private ParquetProperties.WriterVersion getWriterVersion() {
+ return ParquetProperties.WriterVersion.fromString(
+ this.destination.getProperties().getProp(getProperty(WRITER_PARQUET_VERSION), DEFAULT_PARQUET_WRITER));
+ }
+
+ private CompressionCodecName getCodecFromConfig() {
+ State state = this.destination.getProperties();
+ String codecValue = Optional.ofNullable(state.getProp(getProperty(WRITER_CODEC_TYPE)))
+ .orElse(CompressionCodecName.SNAPPY.toString());
+ return CompressionCodecName.valueOf(codecValue.toUpperCase());
+ }
+
+ private String getProperty(String key) {
+ return ForkOperatorUtils.getPropertyNameForBranch(key, this.getBranches(), this.getBranch());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
new file mode 100644
index 0000000..a775bc2
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetHdfsDataWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+import parquet.example.data.Group;
+import parquet.hadoop.ParquetWriter;
+
+
+/**
+ * An extension to {@link FsDataWriter} that writes in Parquet format in the form of {@link Group}s.
+ *
+ * <p>
+ * This implementation allows users to specify the {@link parquet.hadoop.CodecFactory} to use through the configuration
+ * property {@link ConfigurationKeys#WRITER_CODEC_TYPE}. By default, the deflate codec is used.
+ * </p>
+ *
+ * @author tilakpatidar
+ */
+public class ParquetHdfsDataWriter extends FsDataWriter<Group> {
+ private final ParquetWriter<Group> writer;
+ protected final AtomicLong count = new AtomicLong(0);
+
+ public ParquetHdfsDataWriter(ParquetDataWriterBuilder builder, State state)
+ throws IOException {
+ super(builder, state);
+ this.writer = builder.getWriter((int) this.blockSize, this.stagingFile);
+ }
+
+ @Override
+ public void write(Group record)
+ throws IOException {
+ this.writer.write(record);
+ this.count.incrementAndGet();
+ }
+
+ @Override
+ public long recordsWritten() {
+ return this.count.get();
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ try {
+ this.writer.close();
+ } finally {
+ super.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
new file mode 100644
index 0000000..40c638c
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/ParquetHdfsDataWriterTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.convert.GroupRecordConverter;
+import parquet.hadoop.ParquetReader;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_DICTIONARY_PAGE_SIZE;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_PAGE_SIZE;
+import static org.apache.gobblin.writer.ParquetDataWriterBuilder.WRITER_PARQUET_VALIDATE;
+
+
+@Test(groups = {"gobblin.writer"})
+public class ParquetHdfsDataWriterTest {
+
+ private MessageType schema;
+ private String filePath;
+ private ParquetHdfsDataWriter writer;
+ private State properties;
+
+ @BeforeMethod
+ public void setUp()
+ throws Exception {
+ // Making the staging and/or output dirs if necessary
+ File stagingDir = new File(TestConstants.TEST_STAGING_DIR);
+ File outputDir = new File(TestConstants.TEST_OUTPUT_DIR);
+ if (!stagingDir.exists()) {
+ boolean mkdirs = stagingDir.mkdirs();
+ assert mkdirs;
+ }
+ if (!outputDir.exists()) {
+ boolean mkdirs = outputDir.mkdirs();
+ assert mkdirs;
+ }
+ this.schema = TestConstants.PARQUET_SCHEMA;
+ this.filePath = getFilePath();
+ this.properties = createStateWithConfig();
+ this.writer = (ParquetHdfsDataWriter) getParquetDataWriterBuilder().build();
+ }
+
+ private String getFilePath() {
+ return TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE + "/"
+ + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
+ }
+
+ private State createStateWithConfig() {
+ State properties = new State();
+ properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI);
+ properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, TestConstants.TEST_STAGING_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, TestConstants.TEST_OUTPUT_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, this.filePath);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, TestConstants.PARQUET_TEST_FILENAME);
+ properties.setProp(WRITER_PARQUET_DICTIONARY, true);
+ properties.setProp(WRITER_PARQUET_DICTIONARY_PAGE_SIZE, 1024);
+ properties.setProp(WRITER_PARQUET_PAGE_SIZE, 1024);
+ properties.setProp(WRITER_PARQUET_VALIDATE, true);
+ return properties;
+ }
+
+ private ParquetDataWriterBuilder getParquetDataWriterBuilder() {
+ ParquetDataWriterBuilder writerBuilder = new ParquetDataWriterBuilder();
+ writerBuilder.destination = Destination.of(Destination.DestinationType.HDFS, properties);
+ writerBuilder.writerId = TestConstants.TEST_WRITER_ID;
+ writerBuilder.schema = this.schema;
+ writerBuilder.format = WriterOutputFormat.PARQUET;
+ return writerBuilder;
+ }
+
+ private List<Group> readParquetFiles(File outputFile)
+ throws IOException {
+ ParquetReader<Group> reader = null;
+ List<Group> records = new ArrayList<>();
+ try {
+ reader = new ParquetReader<>(new Path(outputFile.toString()), new SimpleReadSupport());
+ for (Group value = reader.read(); value != null; value = reader.read()) {
+ records.add(value);
+ }
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+ }
+ return records;
+ }
+
+ @Test
+ public void testWrite()
+ throws Exception {
+ long firstWrite;
+ long secondWrite;
+ List<Group> records;
+ Group record1 = TestConstants.PARQUET_RECORD_1;
+ Group record2 = TestConstants.PARQUET_RECORD_2;
+ String filePath = TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath;
+ File outputFile = new File(filePath, TestConstants.PARQUET_TEST_FILENAME);
+
+ this.writer.write(record1);
+ firstWrite = this.writer.recordsWritten();
+ this.writer.write(record2);
+ secondWrite = this.writer.recordsWritten();
+ this.writer.close();
+ this.writer.commit();
+ records = readParquetFiles(outputFile);
+ Group resultRecord1 = records.get(0);
+ Group resultRecord2 = records.get(1);
+
+ Assert.assertEquals(firstWrite, 1);
+ Assert.assertEquals(secondWrite, 2);
+ Assert.assertEquals(resultRecord1.getString("name", 0), "tilak");
+ Assert.assertEquals(resultRecord1.getInteger("age", 0), 22);
+ Assert.assertEquals(resultRecord2.getString("name", 0), "other");
+ Assert.assertEquals(resultRecord2.getInteger("age", 0), 22);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ // Clean up the staging and/or output directories if necessary
+ File testRootDir = new File(TestConstants.TEST_ROOT_DIR);
+ if (testRootDir.exists()) {
+ FileUtil.fullyDelete(testRootDir);
+ }
+ }
+
+ class SimpleReadSupport extends ReadSupport<Group> {
+ @Override
+ public RecordMaterializer<Group> prepareForRead(Configuration conf, Map<String, String> metaData,
+ MessageType schema, ReadContext context) {
+ return new GroupRecordConverter(schema);
+ }
+
+ @Override
+ public ReadContext init(InitContext context) {
+ return new ReadContext(context.getFileSchema());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/312e768f/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
new file mode 100644
index 0000000..6144aaf
--- /dev/null
+++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/writer/TestConstants.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+import parquet.example.data.Group;
+import parquet.example.data.simple.SimpleGroup;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Types;
+
+
+public class TestConstants {
+ public static final MessageType PARQUET_SCHEMA = Types.buildMessage()
+ .addFields(Types.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name"),
+ Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("age")).named("User");
+
+ public static final Group PARQUET_RECORD_1 = new SimpleGroup(PARQUET_SCHEMA);
+
+ public static final Group PARQUET_RECORD_2 = new SimpleGroup(PARQUET_SCHEMA);
+
+ public static final String PARQUET_TEST_FILENAME = "test.parquet";
+
+ public static final String TEST_FS_URI = "file:///";
+
+ public static final String TEST_ROOT_DIR = System.getProperty("java.io.tmpdir");
+
+ public static final String TEST_STAGING_DIR = TEST_ROOT_DIR + "/staging";
+
+ public static final String TEST_OUTPUT_DIR = TEST_ROOT_DIR + "/output";
+
+ public static final String TEST_WRITER_ID = "writer-1";
+
+ public static final String TEST_EXTRACT_NAMESPACE = "com.linkedin.writer.test";
+
+ public static final String TEST_EXTRACT_ID = String.valueOf(System.currentTimeMillis());
+
+ public static final String TEST_EXTRACT_TABLE = "TestTable";
+
+ public static final String TEST_EXTRACT_PULL_TYPE = "FULL";
+
+ static {
+ PARQUET_RECORD_1.add("name", "tilak");
+ PARQUET_RECORD_1.add("age", 22);
+ PARQUET_RECORD_2.add("name", "other");
+ PARQUET_RECORD_2.add("age", 22);
+ }
+}