You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ti...@apache.org on 2014/08/20 23:09:37 UTC

git commit: PARQUET-67: mechanism to add extra metadata in the footer

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 54bb98327 -> 792b1490d


PARQUET-67: mechanism to add extra metadata in the footer

this expands on the idea proposed by @wesleypeck in https://github.com/Parquet/parquet-mr/pull/185

Author: julien <ju...@twitter.com>

Closes #32 from julienledem/extensible_metadata and squashes the following commits:

72e0a50 [julien] mechanism to add extra metadata in the footer


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/792b1490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/792b1490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/792b1490

Branch: refs/heads/master
Commit: 792b1490db122e953c9120279ddc86407ffae3c0
Parents: 54bb983
Author: julien <ju...@twitter.com>
Authored: Wed Aug 20 14:09:28 2014 -0700
Committer: Tianshuo Deng <td...@twitter.com>
Committed: Wed Aug 20 14:09:28 2014 -0700

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordWriter.java     |  7 +-
 .../parquet/hadoop/ParquetOutputFormat.java     |  9 +--
 .../hadoop/api/DelegatingReadSupport.java       | 44 ++++++++++++
 .../hadoop/api/DelegatingWriteSupport.java      | 48 +++++++++++++
 .../java/parquet/hadoop/api/WriteSupport.java   | 53 +++++++++++---
 .../hadoop/example/TestInputOutputFormat.java   | 76 ++++++++++++++++----
 6 files changed, 209 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
index c0a66a8..2c8da81 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordWriter.java
@@ -22,6 +22,7 @@ import static parquet.Log.DEBUG;
 import static parquet.Preconditions.checkNotNull;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 
 import parquet.Log;
@@ -29,6 +30,7 @@ import parquet.column.ParquetProperties.WriterVersion;
 import parquet.column.impl.ColumnWriteStoreImpl;
 import parquet.hadoop.CodecFactory.BytesCompressor;
 import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
 import parquet.io.ColumnIOFactory;
 import parquet.io.MessageColumnIO;
 import parquet.schema.MessageType;
@@ -108,7 +110,10 @@ class InternalParquetRecordWriter<T> {
 
   public void close() throws IOException, InterruptedException {
     flushStore();
-    w.end(extraMetaData);
+    FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
+    Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
+    finalMetadata.putAll(finalWriteContext.getExtraMetaData());
+    w.end(finalMetadata);
   }
 
   public void write(T value) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
index a8e5c0a..6703001 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputFormat.java
@@ -16,6 +16,7 @@
 package parquet.hadoop;
 
 import static parquet.Log.INFO;
+import static parquet.Preconditions.checkNotNull;
 import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 import static parquet.hadoop.util.ContextUtil.getConfiguration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import parquet.Log;
+import parquet.Preconditions;
 import parquet.column.ParquetProperties.WriterVersion;
 import parquet.hadoop.api.WriteSupport;
 import parquet.hadoop.api.WriteSupport.WriteContext;
@@ -183,7 +185,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static int getDictionaryPageSize(Configuration configuration) {
     return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
   }
-  
+
   public static WriterVersion getWriterVersion(Configuration configuration) {
     String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
     return WriterVersion.fromString(writerVersion);
@@ -272,7 +274,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);
     w.start();
-    
+
     return new ParquetRecordWriter<T>(
         w,
         writeSupport,
@@ -294,9 +296,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public WriteSupport<T> getWriteSupport(Configuration configuration){
     if (writeSupport != null) return writeSupport;
     Class<?> writeSupportClass = getWriteSupportClass(configuration);
-
     try {
-      return (WriteSupport<T>)writeSupportClass.newInstance();
+      return (WriteSupport<T>)checkNotNull(writeSupportClass, "writeSupportClass").newInstance();
     } catch (InstantiationException e) {
       throw new BadConfigurationException("could not instantiate write support class: " + writeSupportClass, e);
     } catch (IllegalAccessException e) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
new file mode 100644
index 0000000..c987134
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingReadSupport.java
@@ -0,0 +1,44 @@
+package parquet.hadoop.api;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * Helps composing read supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingReadSupport<T> extends ReadSupport<T> {
+
+  private final ReadSupport<T> delegate;
+
+  public DelegatingReadSupport(ReadSupport<T> delegate) {
+    super();
+    this.delegate = delegate;
+  }
+
+  @Override
+  public ReadSupport.ReadContext init(InitContext context) {
+    return delegate.init(context);
+  }
+
+  @Override
+  public RecordMaterializer<T> prepareForRead(
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadSupport.ReadContext readContext) {
+    return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext);
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getName() + "(" + delegate.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
new file mode 100644
index 0000000..69cf155
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/DelegatingWriteSupport.java
@@ -0,0 +1,48 @@
+package parquet.hadoop.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.io.api.RecordConsumer;
+
+/**
+ *
+ * Helps composing write supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingWriteSupport<T> extends WriteSupport<T> {
+
+  private final WriteSupport<T> delegate;
+
+  public DelegatingWriteSupport(WriteSupport<T> delegate) {
+    super();
+    this.delegate = delegate;
+  }
+
+  @Override
+  public WriteSupport.WriteContext init(Configuration configuration) {
+    return delegate.init(configuration);
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    delegate.prepareForWrite(recordConsumer);
+  }
+
+  @Override
+  public void write(T record) {
+    delegate.write(record);
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+    return delegate.finalizeWrite();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getName() + "(" + delegate.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
index 51eb45b..1ee37cd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/api/WriteSupport.java
@@ -15,7 +15,10 @@
  */
 package parquet.hadoop.api;
 
+import static parquet.Preconditions.checkNotNull;
+
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -43,16 +46,14 @@ abstract public class WriteSupport<T> {
     private final MessageType schema;
     private final Map<String, String> extraMetaData;
 
+    /**
+     * @param schema the schema of the data
+     * @param extraMetaData application specific metadata to add in the file
+     */
     public WriteContext(MessageType schema, Map<String, String> extraMetaData) {
       super();
-      if (schema == null) {
-        throw new NullPointerException("schema");
-      }
-      if (extraMetaData == null) {
-        throw new NullPointerException("extraMetaData");
-      }
-      this.schema = schema;
-      this.extraMetaData = Collections.unmodifiableMap(extraMetaData);
+      this.schema = checkNotNull(schema, "schema");
+      this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
     }
     /**
      * @return the schema of the file
@@ -70,6 +71,34 @@ abstract public class WriteSupport<T> {
   }
 
   /**
+   * Information to be added in the file once all the records have been written
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static final class FinalizedWriteContext {
+    private final Map<String, String> extraMetaData;
+    // this class exists to facilitate evolution of the API
+    // we can add more fields later
+
+    /**
+     * @param extraMetaData application specific metadata to add in the file
+     */
+    public FinalizedWriteContext(Map<String, String> extraMetaData) {
+      super();
+      this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
+    }
+
+    /**
+     * @return application specific metadata
+     */
+    public Map<String, String> getExtraMetaData() {
+      return extraMetaData;
+    }
+
+  }
+
+  /**
    * called first in the task
    * @param configuration the job's configuration
    * @return the information needed to write the file
@@ -88,4 +117,12 @@ abstract public class WriteSupport<T> {
    */
   public abstract void write(T record);
 
+  /**
+   * called once in the end after the last record was written
+   * @return information to be added in the file
+   */
+  public FinalizedWriteContext finalizeWrite() {
+    return new FinalizedWriteContext(new HashMap<String, String>());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/792b1490/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
index c9b6b6d..a541a9f 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java
@@ -16,12 +16,17 @@
 package parquet.hadoop.example;
 
 import static java.lang.Thread.sleep;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +43,11 @@ import org.junit.Test;
 import parquet.Log;
 import parquet.example.data.Group;
 import parquet.example.data.simple.SimpleGroupFactory;
+import parquet.hadoop.ParquetInputFormat;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.api.DelegatingReadSupport;
+import parquet.hadoop.api.DelegatingWriteSupport;
+import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
@@ -55,8 +65,8 @@ public class TestInputOutputFormat {
   private String partialSchema;
   private Configuration conf;
 
-  private Class readMapperClass;
-  private Class writeMapperClass;
+  private Class<? extends Mapper<?,?,?,?>> readMapperClass;
+  private Class<? extends Mapper<?,?,?,?>> writeMapperClass;
 
   @Before
   public void setUp() {
@@ -75,8 +85,44 @@ public class TestInputOutputFormat {
             "required int32 line;\n" +
             "}";
 
-    readMapperClass =ReadMapper.class;
-    writeMapperClass=WriteMapper.class;
+    readMapperClass = ReadMapper.class;
+    writeMapperClass = WriteMapper.class;
+  }
+
+
+  public static final class MyWriteSupport extends DelegatingWriteSupport<Group> {
+
+    private long count = 0;
+
+    public MyWriteSupport() {
+      super(new GroupWriteSupport());
+    }
+
+    @Override
+    public void write(Group record) {
+      super.write(record);
+      ++ count;
+    }
+
+    @Override
+    public parquet.hadoop.api.WriteSupport.FinalizedWriteContext finalizeWrite() {
+      Map<String, String> extraMetadata = new HashMap<String, String>();
+      extraMetadata.put("my.count", String.valueOf(count));
+      return new FinalizedWriteContext(extraMetadata);
+    }
+  }
+
+  public static final class MyReadSupport extends DelegatingReadSupport<Group> {
+    public MyReadSupport() {
+      super(new GroupReadSupport());
+    }
+
+    @Override
+    public parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
+      Set<String> counts = context.getKeyValueMetadata().get("my.count");
+      assertTrue("counts: " + counts, counts.size() > 0);
+      return super.init(context);
+    }
   }
 
   public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
@@ -117,26 +163,26 @@ public class TestInputOutputFormat {
       TextInputFormat.addInputPath(writeJob, inputPath);
       writeJob.setInputFormatClass(TextInputFormat.class);
       writeJob.setNumReduceTasks(0);
-      ExampleOutputFormat.setCompression(writeJob, codec);
-      ExampleOutputFormat.setOutputPath(writeJob, parquetPath);
-      writeJob.setOutputFormatClass(ExampleOutputFormat.class);
+      ParquetOutputFormat.setCompression(writeJob, codec);
+      ParquetOutputFormat.setOutputPath(writeJob, parquetPath);
+      writeJob.setOutputFormatClass(ParquetOutputFormat.class);
       writeJob.setMapperClass(readMapperClass);
 
-      ExampleOutputFormat.setSchema(
-              writeJob,
-              MessageTypeParser.parseMessageType(
-                      writeSchema));
+      ParquetOutputFormat.setWriteSupportClass(writeJob, MyWriteSupport.class);
+      GroupWriteSupport.setSchema(
+              MessageTypeParser.parseMessageType(writeSchema),
+              writeJob.getConfiguration());
       writeJob.submit();
       waitForJob(writeJob);
     }
     {
-
       conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
       readJob = new Job(conf, "read");
 
-      readJob.setInputFormatClass(ExampleInputFormat.class);
+      readJob.setInputFormatClass(ParquetInputFormat.class);
+      ParquetInputFormat.setReadSupportClass(readJob, MyReadSupport.class);
 
-      ExampleInputFormat.setInputPaths(readJob, parquetPath);
+      ParquetInputFormat.setInputPaths(readJob, parquetPath);
       readJob.setOutputFormatClass(TextOutputFormat.class);
       TextOutputFormat.setOutputPath(readJob, outputPath);
       readJob.setMapperClass(writeMapperClass);