You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/08/20 23:09:37 UTC
git commit: PARQUET-67: mechanism to add extra metadata in the footer
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 54bb98327 -> 792b1490d
PARQUET-67: mechanism to add extra metadata in the footer
this expands on the idea proposed by @wesleypeck in https://github.com/Parquet/parquet-mr/pull/185
Author: julien <ju...@twitter.com>
Closes #32 from julienledem/extensible_metadata and squashes the following commits:
72e0a50 [julien] mechanism to add extra metadata in the footer
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/792b1490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/792b1490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/792b1490
Branch: refs/heads/master
Commit: 792b1490db122e953c9120279ddc86407ffae3c0
Parents: 54bb983
Author: julien <ju...@twitter.com>
Authored: Wed Aug 20 14:09:28 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Wed Aug 20 14:09:28 2014 -0700
----------------------------------------------------------------------
.../hadoop/InternalParquetRecordWriter.java | 7 +-
.../parquet/hadoop/ParquetOutputFormat.java | 9 +--
.../hadoop/api/DelegatingReadSupport.java | 44 ++++++++++++
.../hadoop/api/DelegatingWriteSupport.java | 48 +++++++++++++
.../java/parquet/hadoop/api/WriteSupport.java | 53 +++++++++++---
.../hadoop/example/TestInputOutputFormat.java | 76 ++++++++++++++++----
6 files changed, 209 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index c0a66a8..2c8da81 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -22,6 +22,7 @@ import static parquet.Log.DEBUG;
import static parquet.Preconditions.checkNotNull;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import parquet.Log;
@@ -29,6 +30,7 @@ import parquet.column.ParquetProperties.WriterVersion;
import parquet.column.impl.ColumnWriteStoreImpl;
import parquet.hadoop.CodecFactory.BytesCompressor;
import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
import parquet.io.ColumnIOFactory;
import parquet.io.MessageColumnIO;
import parquet.schema.MessageType;
@@ -108,7 +110,10 @@ class InternalParquetRecordWriter<T> {
public void close() throws IOException, InterruptedException {
flushStore();
- w.end(extraMetaData);
+ FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
+ Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
+ finalMetadata.putAll(finalWriteContext.getExtraMetaData());
+ w.end(finalMetadata);
}
public void write(T value) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index a8e5c0a..6703001 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -16,6 +16,7 @@
package parquet.hadoop;
import static parquet.Log.INFO;
+import static parquet.Preconditions.checkNotNull;
import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
import static parquet.hadoop.util.ContextUtil.getConfiguration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import parquet.Log;
+import parquet.Preconditions;
import parquet.column.ParquetProperties.WriterVersion;
import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.api.WriteSupport.WriteContext;
@@ -183,7 +185,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static int getDictionaryPageSize(Configuration configuration) {
return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
}
-
+
public static WriterVersion getWriterVersion(Configuration configuration) {
String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
return WriterVersion.fromString(writerVersion);
@@ -272,7 +274,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
w.start();
-
+
return new ParquetRecordWriter<T>(
w,
writeSupport,
@@ -294,9 +296,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public WriteSupport<T> getWriteSupport(Configuration configuration){
if (writeSupport != null) return writeSupport;
Class<?> writeSupportClass = getWriteSupportClass(configuration);
-
try {
- return (WriteSupport<T>)writeSupportClass.newInstance();
+ return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
} catch (InstantiationException e) {
throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
} catch (IllegalAccessException e) {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
new file mode 100644
index 0000000..c987134
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
@@ -0,0 +1,44 @@
+package parquet.hadoop.api;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * Helps composing read supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingReadSupport<T> extends ReadSupport<T> {
+
+ private final ReadSupport<T> delegate;
+
+ public DelegatingReadSupport(ReadSupport<T> delegate) {
+ super();
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ReadSupport.ReadContext init(InitContext context) {
+ return delegate.init(context);
+ }
+
+ @Override
+ public RecordMaterializer<T> prepareForRead(
+ Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadSupport.ReadContext readContext) {
+ return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext);
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName() + "(" + delegate.toString() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
new file mode 100644
index 0000000..69cf155
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
@@ -0,0 +1,48 @@
+package parquet.hadoop.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.io.api.RecordConsumer;
+
+/**
+ *
+ * Helps composing write supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingWriteSupport<T> extends WriteSupport<T> {
+
+ private final WriteSupport<T> delegate;
+
+ public DelegatingWriteSupport(WriteSupport<T> delegate) {
+ super();
+ this.delegate = delegate;
+ }
+
+ @Override
+ public WriteSupport.WriteContext init(Configuration configuration) {
+ return delegate.init(configuration);
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ delegate.prepareForWrite(recordConsumer);
+ }
+
+ @Override
+ public void write(T record) {
+ delegate.write(record);
+ }
+
+ @Override
+ public WriteSupport.FinalizedWriteContext finalizeWrite() {
+ return delegate.finalizeWrite();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "(" + delegate.toString() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
index 51eb45b..1ee37cd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
@@ -15,7 +15,10 @@
*/
package parquet.hadoop.api;
+import static parquet.Preconditions.checkNotNull;
+
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -43,16 +46,14 @@ abstract public class WriteSupport<T> {
private final MessageType schema;
private final Map<String, String> extraMetaData;
+ /**
+ * @param schema the schema of the data
+ * @param extraMetaData application specific metadata to add in the file
+ */
public WriteContext(MessageType schema, Map<String, String> extraMetaData) {
super();
- if (schema == null) {
- throw new NullPointerException("schema");
- }
- if (extraMetaData == null) {
- throw new NullPointerException("extraMetaData");
- }
- this.schema = schema;
- this.extraMetaData = Collections.unmodifiableMap(extraMetaData);
+ this.schema = checkNotNull(schema, "schema");
+ this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
}
/**
* @return the schema of the file
@@ -70,6 +71,34 @@ abstract public class WriteSupport<T> {
}
/**
+ * Information to be added in the file once all the records have been written
+ *
+ * @author Julien Le Dem
+ *
+ */
+ public static final class FinalizedWriteContext {
+ private final Map<String, String> extraMetaData;
+ // this class exists to facilitate evolution of the API
+ // we can add more fields later
+
+ /**
+ * @param extraMetaData application specific metadata to add in the file
+ */
+ public FinalizedWriteContext(Map<String, String> extraMetaData) {
+ super();
+ this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
+ }
+
+ /**
+ * @return application specific metadata
+ */
+ public Map<String, String> getExtraMetaData() {
+ return extraMetaData;
+ }
+
+ }
+
+ /**
* called first in the task
* @param configuration the job's configuration
* @return the information needed to write the file
@@ -88,4 +117,12 @@ abstract public class WriteSupport<T> {
*/
public abstract void write(T record);
+ /**
+ * called once in the end after the last record was written
+ * @return information to be added in the file
+ */
+ public FinalizedWriteContext finalizeWrite() {
+ return new FinalizedWriteContext(new HashMap<String, String>());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
index c9b6b6d..a541a9f 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
@@ -16,12 +16,17 @@
package parquet.hadoop.example;
import static java.lang.Thread.sleep;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +43,11 @@ import org.junit.Test;
import parquet.Log;
import parquet.example.data.Group;
import parquet.example.data.simple.SimpleGroupFactory;
+import parquet.hadoop.ParquetInputFormat;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.api.DelegatingReadSupport;
+import parquet.hadoop.api.DelegatingWriteSupport;
+import parquet.hadoop.api.InitContext;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.util.ContextUtil;
@@ -55,8 +65,8 @@ public class TestInputOutputFormat {
private String partialSchema;
private Configuration conf;
- private Class readMapperClass;
- private Class writeMapperClass;
+ private Class<? extends Mapper<?,?,?,?>> readMapperClass;
+ private Class<? extends Mapper<?,?,?,?>> writeMapperClass;
@Before
public void setUp() {
@@ -75,8 +85,44 @@ public class TestInputOutputFormat {
"required int32 line;\n" +
"}";
- readMapperClass =ReadMapper.class;
- writeMapperClass=WriteMapper.class;
+ readMapperClass = ReadMapper.class;
+ writeMapperClass = WriteMapper.class;
+ }
+
+
+ public static final class MyWriteSupport extends DelegatingWriteSupport<Group> {
+
+ private long count = 0;
+
+ public MyWriteSupport() {
+ super(new GroupWriteSupport());
+ }
+
+ @Override
+ public void write(Group record) {
+ super.write(record);
+ ++ count;
+ }
+
+ @Override
+ public parquet.hadoop.api.WriteSupport.FinalizedWriteContext finalizeWrite() {
+ Map<String, String> extraMetadata = new HashMap<String, String>();
+ extraMetadata.put("my.count", String.valueOf(count));
+ return new FinalizedWriteContext(extraMetadata);
+ }
+ }
+
+ public static final class MyReadSupport extends DelegatingReadSupport<Group> {
+ public MyReadSupport() {
+ super(new GroupReadSupport());
+ }
+
+ @Override
+ public parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
+ Set<String> counts = context.getKeyValueMetadata().get("my.count");
+ assertTrue("counts: " + counts, counts.size() > 0);
+ return super.init(context);
+ }
}
public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
@@ -117,26 +163,26 @@ public class TestInputOutputFormat {
TextInputFormat.addInputPath(writeJob, inputPath);
writeJob.setInputFormatClass(TextInputFormat.class);
writeJob.setNumReduceTasks(0);
- ExampleOutputFormat.setCompression(writeJob, codec);
- ExampleOutputFormat.setOutputPath(writeJob, parquetPath);
- writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+ ParquetOutputFormat.setCompression(writeJob, codec);
+ ParquetOutputFormat.setOutputPath(writeJob, parquetPath);
+ writeJob.setOutputFormatClass(ParquetOutputFormat.class);
writeJob.setMapperClass(readMapperClass);
- ExampleOutputFormat.setSchema(
- writeJob,
- MessageTypeParser.parseMessageType(
- writeSchema));
+ ParquetOutputFormat.setWriteSupportClass(writeJob, MyWriteSupport.class);
+ GroupWriteSupport.setSchema(
+ MessageTypeParser.parseMessageType(writeSchema),
+ writeJob.getConfiguration());
writeJob.submit();
waitForJob(writeJob);
}
{
-
conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
readJob = new Job(conf, "read");
- readJob.setInputFormatClass(ExampleInputFormat.class);
+ readJob.setInputFormatClass(ParquetInputFormat.class);
+ ParquetInputFormat.setReadSupportClass(readJob, MyReadSupport.class);
- ExampleInputFormat.setInputPaths(readJob, parquetPath);
+ ParquetInputFormat.setInputPaths(readJob, parquetPath);
readJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(readJob, outputPath);
readJob.setMapperClass(writeMapperClass);