You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/09/10 19:38:03 UTC
git commit: PARQUET-87: Add API for projection pushdown on the
cascading scheme level
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 24119cc8e -> f637c4458
PARQUET-87: Add API for projection pushdown on the cascading scheme level
JIRA: https://issues.apache.org/jira/browse/PARQUET-87
Previously, the projection pushdown configuration is global, and not bind to a specific tap.
After adding this API, projection pushdown can be done more "naturally", which may benefit scalding. The code that uses this API would look like:
```
Scheme sourceScheme = new ParquetScroogeScheme(new Config().withProjection(projectionFilter));
Tap source = new Hfs(sourceScheme, PARQUET_PATH);
```
Author: Tianshuo Deng <td...@twitter.com>
Closes #51 from tsdeng/projection_from_scheme and squashes the following commits:
2c72757 [Tianshuo Deng] make config class final
813dc1a [Tianshuo Deng] erge branch 'master' into projection_from_scheme
b587b79 [Tianshuo Deng] make constructor of Config private, fix format
3aa7dd2 [Tianshuo Deng] remove builder
9348266 [Tianshuo Deng] use builder()
7c91869 [Tianshuo Deng] make fields of Config private, create builder method for Config
5fdc881 [Tianshuo Deng] builder for setting projection pushdown and predicate pushdown
a47f271 [Tianshuo Deng] immutable
3d514b1 [Tianshuo Deng] done
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/f637c445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/f637c445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/f637c445
Branch: refs/heads/master
Commit: f637c4458a3b1dc4ecaa35957adf13ecfbe7d12d
Parents: 24119cc
Author: Tianshuo Deng <td...@twitter.com>
Authored: Wed Sep 10 10:37:51 2014 -0700
Committer: julien <ju...@twitter.com>
Committed: Wed Sep 10 10:37:51 2014 -0700
----------------------------------------------------------------------
.../parquet/cascading/ParquetTBaseScheme.java | 23 ++--
.../parquet/cascading/ParquetValueScheme.java | 78 +++++++++++-
.../parquet/scrooge/ParquetScroogeScheme.java | 12 +-
.../scrooge/ParquetScroogeSchemeTest.java | 119 +++++++++++++------
.../hadoop/thrift/ThriftReadSupport.java | 4 +
5 files changed, 175 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 111c7ab..40817af 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -33,38 +33,35 @@ import parquet.thrift.TBaseRecordConverter;
public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
- private Class<T> thriftClass;
-
// In the case of reads, we can read the thrift class from the file metadata
public ParquetTBaseScheme() {
+ this(new Config());
}
public ParquetTBaseScheme(Class<T> thriftClass) {
- this.thriftClass = thriftClass;
+ this(new Config().withRecordClass(thriftClass));
}
public ParquetTBaseScheme(FilterPredicate filterPredicate) {
- super(filterPredicate);
+ this(new Config().withFilterPredicate(filterPredicate));
}
public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
- super(filterPredicate);
- this.thriftClass = thriftClass;
+ this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
+ }
+
+ public ParquetTBaseScheme(Config config) {
+ super(config);
}
@SuppressWarnings("rawtypes")
@Override
public void sourceConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-
super.sourceConfInit(fp, tap, jobConf);
jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
-
- if (thriftClass != null) {
- ParquetThriftInputFormat.setThriftClass(jobConf, thriftClass);
- }
}
@SuppressWarnings("rawtypes")
@@ -72,12 +69,12 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme
public void sinkConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
- if (thriftClass == null) {
+ if (this.config.getKlass() == null) {
throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
}
jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, ThriftWriteSupport.class);
- ThriftWriteSupport.<T>setThriftClass(jobConf, thriftClass);
+ ThriftWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
index 5296aee..6e8c13a 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
@@ -16,6 +16,7 @@
package parquet.cascading;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -31,6 +32,10 @@ import cascading.tuple.TupleEntry;
import parquet.filter2.predicate.FilterPredicate;
import parquet.hadoop.ParquetInputFormat;
import parquet.hadoop.mapred.Container;
+import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import parquet.hadoop.thrift.ParquetThriftInputFormat;
+import parquet.hadoop.thrift.ThriftReadSupport;
+import parquet.hadoop.thrift.ThriftWriteSupport;
import static parquet.Preconditions.checkNotNull;
@@ -42,21 +47,84 @@ import static parquet.Preconditions.checkNotNull;
* correctly in the respective Init methods.
*/
public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+ public static final class Config<T> implements Serializable {
+ private final FilterPredicate filterPredicate;
+ private final String projectionString;
+ private final Class<T> klass;
+ private Config(Class<T> klass, FilterPredicate filterPredicate, String projectionString) {
+ this.filterPredicate = filterPredicate;
+ this.projectionString = projectionString;
+ this.klass = klass;
+ }
+
+ public Config() {
+ filterPredicate = null;
+ projectionString = null;
+ klass = null;
+ }
+
+ public FilterPredicate getFilterPredicate() {
+ return filterPredicate;
+ }
+
+ public String getProjectionString() {
+ return projectionString;
+ }
+
+ public Class<T> getKlass() {
+ return klass;
+ }
+
+ public Config withFilterPredicate(FilterPredicate f) {
+ return new Config(this.klass, checkNotNull(f, "filterPredicate"), this.projectionString);
+ }
+
+ public Config withProjectionString(String p) {
+ return new Config(this.klass, this.filterPredicate, checkNotNull(p, "projectionFilter"));
+ }
+
+ public Config withRecordClass(Class<T> klass) {
+ return new Config(checkNotNull(klass, "recordClass"), this.filterPredicate, this.projectionString);
+ }
+ }
+
private static final long serialVersionUID = 157560846420730043L;
- private final FilterPredicate filterPredicate;
+ protected final Config config;
public ParquetValueScheme() {
- this.filterPredicate = null;
+ this(new Config());
}
public ParquetValueScheme(FilterPredicate filterPredicate) {
- this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+ this(new Config().withFilterPredicate(filterPredicate));
}
+ public ParquetValueScheme(Config config) {
+ this.config = config;
+ }
+
+ private void setProjectionPushdown(JobConf jobConf) {
+ if (this.config.projectionString!= null) {
+ ThriftReadSupport.setProjectionPushdown(jobConf, this.config.projectionString);
+ }
+ }
+
+ private void setPredicatePushdown(JobConf jobConf) {
+ if (this.config.filterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate);
+ }
+ }
@Override
public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
- if (filterPredicate != null) {
- ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
+ setPredicatePushdown(jobConf);
+ setProjectionPushdown(jobConf);
+ setRecordClass(jobConf);
+ }
+
+ private void setRecordClass(JobConf jobConf) {
+ if (config.klass != null) {
+ ParquetThriftInputFormat.setThriftClass(jobConf, config.klass);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 1fe1a6e..0f46f8f 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -37,15 +37,17 @@ import parquet.hadoop.thrift.ThriftReadSupport;
public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueScheme<T> {
private static final long serialVersionUID = -8332274507341448397L;
- private final Class<T> klass;
public ParquetScroogeScheme(Class<T> klass) {
- this.klass = klass;
+ this(new Config().withRecordClass(klass));
}
public ParquetScroogeScheme(FilterPredicate filterPredicate, Class<T> klass) {
- super(filterPredicate);
- this.klass = klass;
+ this(new Config().withFilterPredicate(filterPredicate));
+ }
+
+ public ParquetScroogeScheme(Config config) {
+ super(config);
}
@SuppressWarnings("rawtypes")
@@ -66,10 +68,10 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
@Override
public void sourceConfInit(FlowProcess<JobConf> fp,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+ super.sourceConfInit(fp, tap, jobConf);
jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
ParquetInputFormat.setReadSupportClass(jobConf, ScroogeReadSupport.class);
ThriftReadSupport.setRecordConverterClass(jobConf, ScroogeRecordConverter.class);
- ParquetThriftInputFormat.<T>setThriftClass(jobConf, klass);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
index 399ff61..ec04fae 100644
--- a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
+++ b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
@@ -15,20 +15,33 @@
*/
package parquet.scrooge;
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.junit.Test;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.thrift.ParquetThriftInputFormat;
-import parquet.hadoop.thrift.ThriftReadSupport;
+import parquet.cascading.ParquetValueScheme.Config;
import parquet.hadoop.thrift.ThriftToParquetFileWriter;
import parquet.hadoop.util.ContextUtil;
import parquet.scrooge.test.TestPersonWithAllInformation;
@@ -37,6 +50,9 @@ import parquet.thrift.test.Phone;
import parquet.thrift.test.RequiredPrimitiveFixture;
import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,11 +65,15 @@ import static org.junit.Assert.assertEquals;
* @author Tianshuo Deng
*/
public class ParquetScroogeSchemeTest {
+
+ public static final String PARQUET_PATH = "target/test/TestParquetToThriftReadProjection/file.parquet";
+ public static final String TXT_OUTPUT_PATH = "target/test/TestParquetToThriftReadProjection/output.txt";
+
@Test
public void testWritePrimitveThriftReadScrooge() throws Exception {
- RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte) 2, (short) 3, 4, (long) 5, (double) 6.0, "7");
+ RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte)2, (short)3, 4, (long)5, 6.0, "7");
toWrite.setInfo_string("it's info");
- verifyScroogeRead(toWrite, parquet.scrooge.test.RequiredPrimitiveFixture.class, "RequiredPrimitiveFixture(true,2,3,4,5,6.0,7,Some(it's info))","**");
+ verifyScroogeRead(thriftRecords(toWrite), parquet.scrooge.test.RequiredPrimitiveFixture.class, "RequiredPrimitiveFixture(true,2,3,4,5,6.0,7,Some(it's info))\n", "**");
}
@Test
@@ -62,52 +82,75 @@ public class ParquetScroogeSchemeTest {
phoneMap.put("key1", new parquet.thrift.test.Phone("111", "222"));
parquet.thrift.test.TestPersonWithAllInformation toWrite = new parquet.thrift.test.TestPersonWithAllInformation(new parquet.thrift.test.Name("first"), new Address("my_street", "my_zip"), phoneMap);
toWrite.setInfo("my_info");
- String expected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(key1 -> Phone(111,222)),None,None)";
- verifyScroogeRead(toWrite, TestPersonWithAllInformation.class, expected,"**");
- String expectedProjected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(),None,None)";
- verifyScroogeRead(toWrite, TestPersonWithAllInformation.class, expectedProjected,"address/*;info;name/first_name");
+
+ String expected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(key1 -> Phone(111,222)),None,None)\n";
+ verifyScroogeRead(thriftRecords(toWrite), TestPersonWithAllInformation.class, expected, "**");
+
+ String expectedProjected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(),None,None)\n";
+ verifyScroogeRead(thriftRecords(toWrite), TestPersonWithAllInformation.class, expectedProjected, "address/*;info;name/first_name");
}
- public <T> void verifyScroogeRead(TBase recordToWrite, Class<T> readClass, String expectedStr, String projectionFilter) throws Exception {
- Configuration conf = new Configuration();
- conf.set("parquet.thrift.converter.class", ScroogeRecordConverter.class.getName());
- conf.set(ThriftReadSupport.THRIFT_READ_CLASS_KEY, readClass.getName());
- conf.set(ThriftReadSupport.THRIFT_COLUMN_FILTER_KEY, projectionFilter);
-
- final Path parquetFile = new Path("target/test/TestParquetToThriftReadProjection/file.parquet");
- final FileSystem fs = parquetFile.getFileSystem(conf);
- if (fs.exists(parquetFile)) {
- fs.delete(parquetFile, true);
+ private static class ObjectToStringFunction extends BaseOperation implements Function {
+ @Override
+ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ Object record = functionCall.getArguments().getObject(0);
+ Tuple result = new Tuple();
+ result.add(record.toString());
+ functionCall.getOutputCollector().add(result);
}
+ }
+
+ public <T> void verifyScroogeRead(List<TBase> recordsToWrite, Class<T> readClass, String expectedStr, String projectionFilter) throws Exception {
+ Configuration conf = new Configuration();
+ deleteIfExist(PARQUET_PATH);
+ deleteIfExist(TXT_OUTPUT_PATH);
+ final Path parquetFile = new Path(PARQUET_PATH);
+ writeParquetFile(recordsToWrite, conf, parquetFile);
+
+ Scheme sourceScheme = new ParquetScroogeScheme(new Config().withRecordClass(readClass).withProjectionString(projectionFilter));
+ Tap source = new Hfs(sourceScheme, PARQUET_PATH);
+
+ Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+ Tap sink = new Hfs(sinkScheme, TXT_OUTPUT_PATH);
+ Pipe assembly = new Pipe("namecp");
+ assembly = new Each(assembly, new ObjectToStringFunction());
+ Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+ flow.complete();
+ String result = FileUtils.readFileToString(new File(TXT_OUTPUT_PATH + "/part-00000"));
+ assertEquals(expectedStr, result);
+ }
+
+ private void writeParquetFile(List<TBase> recordsToWrite, Configuration conf, Path parquetFile) throws IOException, InterruptedException, org.apache.thrift.TException {
//create a test file
final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- Class writeClass = recordToWrite.getClass();
+ Class writeClass = recordsToWrite.get(0).getClass();
final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, writeClass);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
- recordToWrite.write(protocol);
+ for (TBase recordToWrite : recordsToWrite) {
+ recordToWrite.write(protocol);
+ }
w.write(new BytesWritable(baos.toByteArray()));
w.close();
+ }
- final ParquetScroogeInputFormat<T> parquetScroogeInputFormat = new ParquetScroogeInputFormat<T>();
- final Job job = new Job(conf, "read");
- job.setInputFormatClass(ParquetThriftInputFormat.class);
- ParquetThriftInputFormat.setInputPaths(job, parquetFile);
- final JobID jobID = new JobID("local", 1);
- List<InputSplit> splits = parquetScroogeInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
- T readValue = null;
- for (InputSplit split : splits) {
- TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
- final RecordReader<Void, T> reader = parquetScroogeInputFormat.createRecordReader(split, taskAttemptContext);
- reader.initialize(split, taskAttemptContext);
- if (reader.nextKeyValue()) {
- readValue = reader.getCurrentValue();
- }
+ private List<TBase> thriftRecords(TBase... records) {
+ List<TBase> result = new ArrayList<TBase>();
+ for (TBase record : records) {
+ result.add(record);
}
- assertEquals(expectedStr, readValue.toString());
+ return result;
}
+ private void deleteIfExist(String path) throws IOException {
+ Path p = new Path(path);
+ Configuration conf = new Configuration();
+ final FileSystem fs = p.getFileSystem(conf);
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
index abf38ff..ca01a3f 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -170,4 +170,8 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
throw new RuntimeException("Unable to create Thrift Converter for Thrift metadata " + thriftMetaData, t);
}
}
+
+ public static void setProjectionPushdown(JobConf jobConf, String projectionString) {
+ jobConf.set(THRIFT_COLUMN_FILTER_KEY, projectionString);
+ }
}