You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/03/13 21:36:52 UTC
incubator-parquet-mr git commit: PARQUET-197 : Gen parquet metadata
from cascading
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 9ee3a1617 -> 2e3c05359
PARQUET-197 : Gen parquet metadata from cascading
retry of PARQUET-197
fixed support for hadoop2 API
Author: Tianshuo Deng <td...@twitter.com>
Closes #141 from tsdeng/gen_parquet_meta and squashes the following commits:
d1211a0 [Tianshuo Deng] fix hadoop2 API
8686ce4 [Tianshuo Deng] gem parquet metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/2e3c0535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/2e3c0535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/2e3c0535
Branch: refs/heads/master
Commit: 2e3c05359a0e21be44d307104eea3134afcef5f0
Parents: 9ee3a16
Author: Tianshuo Deng <td...@twitter.com>
Authored: Fri Mar 13 13:36:49 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Fri Mar 13 13:36:49 2015 -0700
----------------------------------------------------------------------
.../parquet/cascading/ParquetTBaseScheme.java | 2 +-
.../parquet/cascading/ParquetTupleScheme.java | 2 +-
.../cascading/TestParquetTBaseScheme.java | 19 ++++++---
.../parquet/hadoop/ParquetOutputCommitter.java | 4 ++
.../mapred/DeprecatedParquetOutputFormat.java | 5 +++
.../mapred/MapredParquetOutputCommitter.java | 45 ++++++++++++++++++++
.../parquet/scrooge/ParquetScroogeScheme.java | 5 +--
7 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 41f8c4f..ab84749 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -73,7 +73,7 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme
throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
}
- jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
index ea0a953..7f6ac3a 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTupleScheme.java
@@ -171,7 +171,7 @@ public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputColl
@Override
public void sinkConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
- jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
index 11a5b5e..8e5b96b 100644
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -54,6 +55,8 @@ import parquet.thrift.test.Name;
import java.io.File;
import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
public class TestParquetTBaseScheme {
final String txtInputPath = "src/test/resources/names.txt";
@@ -64,7 +67,8 @@ public class TestParquetTBaseScheme {
@Test
public void testWrite() throws Exception {
Path path = new Path(parquetOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
+ JobConf jobConf = new JobConf();
+ final FileSystem fs = path.getFileSystem(jobConf);
if (fs.exists(path)) fs.delete(path, true);
Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
@@ -75,9 +79,14 @@ public class TestParquetTBaseScheme {
Pipe assembly = new Pipe( "namecp" );
assembly = new Each(assembly, new PackThriftFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+ HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
+ Flow flow = hadoopFlowConnector.connect("namecp", source, sink, assembly);
flow.complete();
+
+ assertTrue(fs.exists(new Path(parquetOutputPath)));
+ assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
+ assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
index 841c211..0e0ce42 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetOutputCommitter.java
@@ -45,6 +45,10 @@ public class ParquetOutputCommitter extends FileOutputCommitter {
public void commitJob(JobContext jobContext) throws IOException {
super.commitJob(jobContext);
Configuration configuration = ContextUtil.getConfiguration(jobContext);
+ writeMetaDataFile(configuration,outputPath);
+ }
+
+ public static void writeMetaDataFile(Configuration configuration, Path outputPath) {
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
try {
final FileSystem fileSystem = outputPath.getFileSystem(configuration);
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
index 5b84e54..c0defb1 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -55,6 +55,11 @@ public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.F
configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary);
}
+ public static void setAsOutputFormat(JobConf jobConf) {
+ jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ jobConf.setOutputCommitter(MapredParquetOutputCommitter.class);
+ }
+
private CompressionCodecName getCodec(final JobConf conf) {
return CodecConfig.from(conf).getCodec();
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
new file mode 100644
index 0000000..fcb210f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package parquet.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import parquet.hadoop.ParquetOutputCommitter;
+import parquet.hadoop.util.ContextUtil;
+
+import java.io.IOException;
+
+/**
+ *
+ * Adapter for supporting ParquetOutputCommitter in mapred API
+ *
+ * @author Tianshuo Deng
+ */
+public class MapredParquetOutputCommitter extends FileOutputCommitter {
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ super.commitJob(jobContext);
+ Configuration conf = ContextUtil.getConfiguration(jobContext);
+ Path outputPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+ ParquetOutputCommitter.writeMetaDataFile(conf, outputPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/2e3c0535/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 3abe957..2745307 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -18,8 +18,6 @@
*/
package parquet.scrooge;
-import java.io.IOException;
-
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
@@ -27,7 +25,6 @@ import org.apache.hadoop.mapred.RecordReader;
import com.twitter.scrooge.ThriftStruct;
import cascading.flow.FlowProcess;
-import cascading.scheme.SinkCall;
import cascading.tap.Tap;
import parquet.cascading.ParquetValueScheme;
import parquet.filter2.predicate.FilterPredicate;
@@ -56,7 +53,7 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
@Override
public void sinkConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
- jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
ParquetOutputFormat.setWriteSupportClass(jobConf, ScroogeWriteSupport.class);
ScroogeWriteSupport.setScroogeClass(jobConf, this.config.getKlass());
}