You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/09/10 19:38:03 UTC

git commit: PARQUET-87: Add API for projection pushdown on the cascading scheme level

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 24119cc8e -> f637c4458


PARQUET-87: Add API for projection pushdown on the cascading scheme level

JIRA: https://issues.apache.org/jira/browse/PARQUET-87
Previously, the projection pushdown configuration is global, and not bind to a specific tap.
After adding this API, projection pushdown can be done more "naturally", which may benefit scalding. The code that uses this API would look like:

```
Scheme sourceScheme = new ParquetScroogeScheme(new Config().withProjection(projectionFilter));
 Tap source = new Hfs(sourceScheme, PARQUET_PATH);
```

Author: Tianshuo Deng <td...@twitter.com>

Closes #51 from tsdeng/projection_from_scheme and squashes the following commits:

2c72757 [Tianshuo Deng] make config class final
813dc1a [Tianshuo Deng] erge branch 'master' into projection_from_scheme
b587b79 [Tianshuo Deng] make constructor of Config private, fix format
3aa7dd2 [Tianshuo Deng] remove builder
9348266 [Tianshuo Deng] use builder()
7c91869 [Tianshuo Deng] make fields of Config private, create builder method for Config
5fdc881 [Tianshuo Deng] builder for setting projection pushdown and predicate pushdown
a47f271 [Tianshuo Deng] immutable
3d514b1 [Tianshuo Deng] done


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/f637c445
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/f637c445
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/f637c445

Branch: refs/heads/master
Commit: f637c4458a3b1dc4ecaa35957adf13ecfbe7d12d
Parents: 24119cc
Author: Tianshuo Deng <td...@twitter.com>
Authored: Wed Sep 10 10:37:51 2014 -0700
Committer: julien <ju...@twitter.com>
Committed: Wed Sep 10 10:37:51 2014 -0700

----------------------------------------------------------------------
 .../parquet/cascading/ParquetTBaseScheme.java   |  23 ++--
 .../parquet/cascading/ParquetValueScheme.java   |  78 +++++++++++-
 .../parquet/scrooge/ParquetScroogeScheme.java   |  12 +-
 .../scrooge/ParquetScroogeSchemeTest.java       | 119 +++++++++++++------
 .../hadoop/thrift/ThriftReadSupport.java        |   4 +
 5 files changed, 175 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
index 111c7ab..40817af 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetTBaseScheme.java
@@ -33,38 +33,35 @@ import parquet.thrift.TBaseRecordConverter;
 
 public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
 
-  private Class<T> thriftClass;
-
   // In the case of reads, we can read the thrift class from the file metadata
   public ParquetTBaseScheme() {
+    this(new Config());
   }
 
   public ParquetTBaseScheme(Class<T> thriftClass) {
-    this.thriftClass = thriftClass;
+    this(new Config().withRecordClass(thriftClass));
   }
 
   public ParquetTBaseScheme(FilterPredicate filterPredicate) {
-    super(filterPredicate);
+    this(new Config().withFilterPredicate(filterPredicate));
   }
 
   public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
-    super(filterPredicate);
-    this.thriftClass = thriftClass;
+    this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetTBaseScheme(Config config) {
+    super(config);
   }
 
   @SuppressWarnings("rawtypes")
   @Override
   public void sourceConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
-
     super.sourceConfInit(fp, tap, jobConf);
     jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
     ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
     ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
-
-    if (thriftClass != null) {
-      ParquetThriftInputFormat.setThriftClass(jobConf, thriftClass);
-    }
   }
 
   @SuppressWarnings("rawtypes")
@@ -72,12 +69,12 @@ public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme
   public void sinkConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
 
-    if (thriftClass == null) {
+    if (this.config.getKlass() == null) {
       throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
     }
 
     jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
     DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, ThriftWriteSupport.class);
-    ThriftWriteSupport.<T>setThriftClass(jobConf, thriftClass);
+    ThriftWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
index 5296aee..6e8c13a 100644
--- a/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
+++ b/parquet-cascading/src/main/java/parquet/cascading/ParquetValueScheme.java
@@ -16,6 +16,7 @@
 package parquet.cascading;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -31,6 +32,10 @@ import cascading.tuple.TupleEntry;
 import parquet.filter2.predicate.FilterPredicate;
 import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.mapred.Container;
+import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import parquet.hadoop.thrift.ParquetThriftInputFormat;
+import parquet.hadoop.thrift.ThriftReadSupport;
+import parquet.hadoop.thrift.ThriftWriteSupport;
 
 import static parquet.Preconditions.checkNotNull;
 
@@ -42,21 +47,84 @@ import static parquet.Preconditions.checkNotNull;
  * correctly in the respective Init methods.
  */
 public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+  public static final class Config<T> implements Serializable {
+    private final FilterPredicate filterPredicate;
+    private final String projectionString;
+    private final Class<T> klass;
+    private Config(Class<T> klass, FilterPredicate filterPredicate, String projectionString) {
+      this.filterPredicate = filterPredicate;
+      this.projectionString = projectionString;
+      this.klass = klass;
+    }
+
+    public Config() {
+      filterPredicate = null;
+      projectionString = null;
+      klass = null;
+    }
+
+    public FilterPredicate getFilterPredicate() {
+      return filterPredicate;
+    }
+
+    public String getProjectionString() {
+      return projectionString;
+    }
+
+    public Class<T> getKlass() {
+      return klass;
+    }
+
+    public Config withFilterPredicate(FilterPredicate f) {
+      return new Config(this.klass, checkNotNull(f, "filterPredicate"), this.projectionString);
+    }
+
+    public Config withProjectionString(String p) {
+      return new Config(this.klass, this.filterPredicate, checkNotNull(p, "projectionFilter"));
+    }
+
+    public Config withRecordClass(Class<T> klass) {
+      return new Config(checkNotNull(klass, "recordClass"), this.filterPredicate, this.projectionString);
+    }
+  }
+
   private static final long serialVersionUID = 157560846420730043L;
-  private final FilterPredicate filterPredicate;
+  protected final Config config;
 
   public ParquetValueScheme() {
-    this.filterPredicate = null;
+    this(new Config());
   }
 
   public ParquetValueScheme(FilterPredicate filterPredicate) {
-    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+    this(new Config().withFilterPredicate(filterPredicate));
   }
 
+  public ParquetValueScheme(Config config) {
+    this.config = config;
+  }
+
+  private void setProjectionPushdown(JobConf jobConf) {
+    if (this.config.projectionString!= null) {
+      ThriftReadSupport.setProjectionPushdown(jobConf, this.config.projectionString);
+    }
+  }
+
+  private void setPredicatePushdown(JobConf jobConf) {
+    if (this.config.filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(jobConf, this.config.filterPredicate);
+    }
+  }
   @Override
   public void sourceConfInit(FlowProcess<JobConf> jobConfFlowProcess, Tap<JobConf, RecordReader, OutputCollector> jobConfRecordReaderOutputCollectorTap, final JobConf jobConf) {
-    if (filterPredicate != null) {
-      ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
+    setPredicatePushdown(jobConf);
+    setProjectionPushdown(jobConf);
+    setRecordClass(jobConf);
+  }
+
+  private void setRecordClass(JobConf jobConf) {
+    if (config.klass != null) {
+      ParquetThriftInputFormat.setThriftClass(jobConf, config.klass);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
index 1fe1a6e..0f46f8f 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ParquetScroogeScheme.java
@@ -37,15 +37,17 @@ import parquet.hadoop.thrift.ThriftReadSupport;
 public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueScheme<T> {
 
   private static final long serialVersionUID = -8332274507341448397L;
-  private final Class<T> klass;
 
   public ParquetScroogeScheme(Class<T> klass) {
-    this.klass = klass;
+    this(new Config().withRecordClass(klass));
   }
 
   public ParquetScroogeScheme(FilterPredicate filterPredicate, Class<T> klass) {
-    super(filterPredicate);
-    this.klass = klass;
+    this(new Config().withFilterPredicate(filterPredicate));
+  }
+
+  public ParquetScroogeScheme(Config config) {
+    super(config);
   }
 
   @SuppressWarnings("rawtypes")
@@ -66,10 +68,10 @@ public class ParquetScroogeScheme<T extends ThriftStruct> extends ParquetValueSc
   @Override
   public void sourceConfInit(FlowProcess<JobConf> fp,
       Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+    super.sourceConfInit(fp, tap, jobConf);
     jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
     ParquetInputFormat.setReadSupportClass(jobConf, ScroogeReadSupport.class);
     ThriftReadSupport.setRecordConverterClass(jobConf, ScroogeRecordConverter.class);
-    ParquetThriftInputFormat.<T>setThriftClass(jobConf, klass);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
index 399ff61..ec04fae 100644
--- a/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
+++ b/parquet-scrooge/src/test/java/parquet/scrooge/ParquetScroogeSchemeTest.java
@@ -15,20 +15,33 @@
  */
 package parquet.scrooge;
 
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.thrift.TBase;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.junit.Test;
-import parquet.hadoop.ParquetInputFormat;
-import parquet.hadoop.thrift.ParquetThriftInputFormat;
-import parquet.hadoop.thrift.ThriftReadSupport;
+import parquet.cascading.ParquetValueScheme.Config;
 import parquet.hadoop.thrift.ThriftToParquetFileWriter;
 import parquet.hadoop.util.ContextUtil;
 import parquet.scrooge.test.TestPersonWithAllInformation;
@@ -37,6 +50,9 @@ import parquet.thrift.test.Phone;
 import parquet.thrift.test.RequiredPrimitiveFixture;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,11 +65,15 @@ import static org.junit.Assert.assertEquals;
  * @author Tianshuo Deng
  */
 public class ParquetScroogeSchemeTest {
+
+  public static final String PARQUET_PATH = "target/test/TestParquetToThriftReadProjection/file.parquet";
+  public static final String TXT_OUTPUT_PATH = "target/test/TestParquetToThriftReadProjection/output.txt";
+
   @Test
   public void testWritePrimitveThriftReadScrooge() throws Exception {
-    RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte) 2, (short) 3, 4, (long) 5, (double) 6.0, "7");
+    RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte)2, (short)3, 4, (long)5, 6.0, "7");
     toWrite.setInfo_string("it's info");
-    verifyScroogeRead(toWrite, parquet.scrooge.test.RequiredPrimitiveFixture.class, "RequiredPrimitiveFixture(true,2,3,4,5,6.0,7,Some(it's info))","**");
+    verifyScroogeRead(thriftRecords(toWrite), parquet.scrooge.test.RequiredPrimitiveFixture.class, "RequiredPrimitiveFixture(true,2,3,4,5,6.0,7,Some(it's info))\n", "**");
   }
 
   @Test
@@ -62,52 +82,75 @@ public class ParquetScroogeSchemeTest {
     phoneMap.put("key1", new parquet.thrift.test.Phone("111", "222"));
     parquet.thrift.test.TestPersonWithAllInformation toWrite = new parquet.thrift.test.TestPersonWithAllInformation(new parquet.thrift.test.Name("first"), new Address("my_street", "my_zip"), phoneMap);
     toWrite.setInfo("my_info");
-    String expected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(key1 -> Phone(111,222)),None,None)";
-    verifyScroogeRead(toWrite, TestPersonWithAllInformation.class, expected,"**");
-    String expectedProjected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(),None,None)";
-    verifyScroogeRead(toWrite, TestPersonWithAllInformation.class, expectedProjected,"address/*;info;name/first_name");
+
+    String expected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(key1 -> Phone(111,222)),None,None)\n";
+    verifyScroogeRead(thriftRecords(toWrite), TestPersonWithAllInformation.class, expected, "**");
+
+    String expectedProjected = "TestPersonWithAllInformation(Name(first,None),None,Address(my_street,my_zip),None,Some(my_info),Map(),None,None)\n";
+    verifyScroogeRead(thriftRecords(toWrite), TestPersonWithAllInformation.class, expectedProjected, "address/*;info;name/first_name");
   }
 
-  public <T> void verifyScroogeRead(TBase recordToWrite, Class<T> readClass, String expectedStr, String projectionFilter) throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("parquet.thrift.converter.class", ScroogeRecordConverter.class.getName());
-    conf.set(ThriftReadSupport.THRIFT_READ_CLASS_KEY, readClass.getName());
-    conf.set(ThriftReadSupport.THRIFT_COLUMN_FILTER_KEY, projectionFilter);
-
-    final Path parquetFile = new Path("target/test/TestParquetToThriftReadProjection/file.parquet");
-    final FileSystem fs = parquetFile.getFileSystem(conf);
-    if (fs.exists(parquetFile)) {
-      fs.delete(parquetFile, true);
+  private static class ObjectToStringFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      Object record = functionCall.getArguments().getObject(0);
+      Tuple result = new Tuple();
+      result.add(record.toString());
+      functionCall.getOutputCollector().add(result);
     }
+  }
+
+  public <T> void verifyScroogeRead(List<TBase> recordsToWrite, Class<T> readClass, String expectedStr, String projectionFilter) throws Exception {
+    Configuration conf = new Configuration();
+    deleteIfExist(PARQUET_PATH);
+    deleteIfExist(TXT_OUTPUT_PATH);
+    final Path parquetFile = new Path(PARQUET_PATH);
+    writeParquetFile(recordsToWrite, conf, parquetFile);
+
+    Scheme sourceScheme = new ParquetScroogeScheme(new Config().withRecordClass(readClass).withProjectionString(projectionFilter));
+    Tap source = new Hfs(sourceScheme, PARQUET_PATH);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, TXT_OUTPUT_PATH);
 
+    Pipe assembly = new Pipe("namecp");
+    assembly = new Each(assembly, new ObjectToStringFunction());
+    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(TXT_OUTPUT_PATH + "/part-00000"));
+    assertEquals(expectedStr, result);
+  }
+
+  private void writeParquetFile(List<TBase> recordsToWrite, Configuration conf, Path parquetFile) throws IOException, InterruptedException, org.apache.thrift.TException {
     //create a test file
     final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
     final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
-    Class writeClass = recordToWrite.getClass();
+    Class writeClass = recordsToWrite.get(0).getClass();
     final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, writeClass);
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
-    recordToWrite.write(protocol);
+    for (TBase recordToWrite : recordsToWrite) {
+      recordToWrite.write(protocol);
+    }
     w.write(new BytesWritable(baos.toByteArray()));
     w.close();
+  }
 
-    final ParquetScroogeInputFormat<T> parquetScroogeInputFormat = new ParquetScroogeInputFormat<T>();
-    final Job job = new Job(conf, "read");
-    job.setInputFormatClass(ParquetThriftInputFormat.class);
-    ParquetThriftInputFormat.setInputPaths(job, parquetFile);
-    final JobID jobID = new JobID("local", 1);
-    List<InputSplit> splits = parquetScroogeInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
-    T readValue = null;
-    for (InputSplit split : splits) {
-      TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
-      final RecordReader<Void, T> reader = parquetScroogeInputFormat.createRecordReader(split, taskAttemptContext);
-      reader.initialize(split, taskAttemptContext);
-      if (reader.nextKeyValue()) {
-        readValue = reader.getCurrentValue();
-      }
+  private List<TBase> thriftRecords(TBase... records) {
+    List<TBase> result = new ArrayList<TBase>();
+    for (TBase record : records) {
+      result.add(record);
     }
-    assertEquals(expectedStr, readValue.toString());
+    return result;
   }
 
+  private void deleteIfExist(String path) throws IOException {
+    Path p = new Path(path);
+    Configuration conf = new Configuration();
+    final FileSystem fs = p.getFileSystem(conf);
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/f637c445/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
index abf38ff..ca01a3f 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -170,4 +170,8 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
       throw new RuntimeException("Unable to create Thrift Converter for Thrift metadata " + thriftMetaData, t);
     }
   }
+
+  public static void setProjectionPushdown(JobConf jobConf, String projectionString) {
+    jobConf.set(THRIFT_COLUMN_FILTER_KEY, projectionString);
+  }
 }