You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/03/13 21:36:52 UTC

incubator-parquet-mr git commit: PARQUET-197 : Gen parquet metadata from cascading

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 9ee3a1617 -> 2e3c05359


PARQUET-197 : Gen parquet metadata from cascading

retry of PARQUET-197

fixed support for hadoop2 API

Author: Tianshuo Deng <td...@twitter.com>

Closes #141 from tsdeng/gen_parquet_meta and squashes the following commits:

d1211a0 [Tianshuo Deng] fix hadoop2 API
8686ce4 [Tianshuo Deng] gem parquet metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/2e3c0535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/2e3c0535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/2e3c0535

Branch: refs/heads/master
Commit: 2e3c05359a0e21be44d307104eea3134afcef5f0
Parents: 9ee3a16
Author: Tianshuo Deng <td...@twitter.com>
Authored: Fri Mar 13 13:36:49 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Fri Mar 13 13:36:49 2015 -0700

----------------------------------------------------------------------
 .../parquet/cascading/ParquetTBaseScheme.java   |  2 +-
 .../parquet/cascading/ParquetTupleScheme.java   |  2 +-
 .../cascading/TestParquetTBaseScheme.java       | 19 ++++++---
 .../parquet/hadoop/ParquetOutputCommitter.java  |  4 ++
 .../mapred/DeprecatedParquetOutputFormat.java   |  5 +++
 .../mapred/MapredParquetOutputCommitter.java    | 45 ++++++++++++++++++++
 .../parquet/scrooge/ParquetScroogeScheme.java   |  5 +--
 7 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 41f8c4f..ab84749 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -73,7 +73,7 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme
       throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
     }
 
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
     TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
index ea0a953..7f6ac3a 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
@@ -171,7 +171,7 @@ public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputColl
   @Override
   public void sinkConfInit(FlowProcess<JobConf> fp,
           Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
     ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
index 11a5b5e..8e5b96b 100644
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -54,6 +55,8 @@ import parquet.thrift.test.Name;
 
 import java.io.File;
 import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 public class TestParquetTBaseScheme {
   final String txtInputPath = "src/test/resources/names.txt";
@@ -64,7 +67,8 @@ public class TestParquetTBaseScheme {
   @Test
   public void testWrite() throws Exception {
     Path path = new Path(parquetOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
+    JobConf jobConf = new JobConf();
+    final FileSystem fs = path.getFileSystem(jobConf);
     if (fs.exists(path)) fs.delete(path, true);
 
     Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
@@ -75,9 +79,14 @@ public class TestParquetTBaseScheme {
 
     Pipe assembly = new Pipe( "namecp" );
     assembly = new Each(assembly, new PackThriftFunction());
-    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+    HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
+    Flow flow  = hadoopFlowConnector.connect("namecp", source, sink, assembly);
 
     flow.complete();
+
+    assertTrue(fs.exists(new Path(parquetOutputPath)));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
+    assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 841c211..0e0ce42 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -45,6 +45,10 @@ public class ParquetOutputCommitter extends FileOutputCommitter {
   public void commitJob(JobContext jobContext) throws IOException {
     super.commitJob(jobContext);
     Configuration configuration = ContextUtil.getConfiguration(jobContext);
+    writeMetaDataFile(configuration,outputPath);
+  }
+
+  public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
     if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
       try {
         final FileSystem fileSystem = outputPath.getFileSystem(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
index 5b84e54..c0defb1 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -55,6 +55,11 @@ public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.F
     configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary);
   }
 
+  public static void setAsOutputFormat(JobConf jobConf) {
+    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    jobConf.setOutputCommitter(MapredParquetOutputCommitter.class);
+  }
+
   private CompressionCodecName getCodec(final JobConf conf) {
     return CodecConfig.from(conf).getCodec();
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
new file mode 100644
index 0000000..fcb210f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package parquet.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import parquet.hadoop.ParquetOutputCommitter;
+import parquet.hadoop.util.ContextUtil;
+
+import java.io.IOException;
+
+/**
+ *
+ * Adapter for supporting ParquetOutputCommitter in mapred API
+ *
+ * @author Tianshuo Deng
+ */
+public class MapredParquetOutputCommitter extends FileOutputCommitter {
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    super.commitJob(jobContext);
+    Configuration conf = ContextUtil.getConfiguration(jobContext);
+    Path outputPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+    ParquetOutputCommitter.writeMetaDataFile(conf, outputPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 3abe957..2745307 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -18,8 +18,6 @@
  */
 package parquet.scrooge;
 
-import java.io.IOException;
-
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
@@ -27,7 +25,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import com.twitter.scrooge.ThriftStruct;
 
 import cascading.flow.FlowProcess;
-import cascading.scheme.SinkCall;
 import cascading.tap.Tap;
 import parquet.cascading.ParquetValueScheme;
 import parquet.filter2.predicate.FilterPredicate;
@@ -56,7 +53,7 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
   @Override
   public void sinkConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
     ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class);
     ScroogeWriteSupport.setScroogeClass(jobConf, this.config.getKlass());
   }