You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2018/04/13 22:01:58 UTC

svn commit: r1829112 - in /pig/trunk: CHANGES.txt ivy.xml ivy/libraries.properties src/org/apache/pig/builtin/ParquetLoader.java src/org/apache/pig/builtin/ParquetStorer.java test/org/apache/pig/test/TestSplitCombine.java

Author: rohini
Date: Fri Apr 13 22:01:58 2018
New Revision: 1829112

URL: http://svn.apache.org/viewvc?rev=1829112&view=rev
Log:
PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java
    pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java
    pig/trunk/test/org/apache/pig/test/TestSplitCombine.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 13 22:01:58 2018
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
+
 PIG-5317: Upgrade old dependencies: commons-lang, hsqldb, commons-logging (nkollar via rohini)
 
 PIG-5322: ConstantCalculator optimizer is not applied for split (rohini)

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Apr 13 22:01:58 2018
@@ -390,7 +390,7 @@
 
     <dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="test->default"/>
 
-    <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
+    <dependency org="org.apache.parquet" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
 
     <!-- for Spark 1.x integration -->
     <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default">

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Apr 13 22:01:58 2018
@@ -87,7 +87,7 @@ jansi.version=1.9
 asm.version=3.3.1
 snappy-java.version=1.1.1.3
 tez.version=0.7.0
-parquet-pig-bundle.version=1.2.3
+parquet-pig-bundle.version=1.9.0
 snappy.version=0.2
 leveldbjni.version=1.8
 curator.version=2.6.0

Modified: pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/ParquetLoader.java Fri Apr 13 22:01:58 2018
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
 import org.apache.pig.LoadFuncMetadataWrapper;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.JarManager;
@@ -29,7 +31,7 @@ import org.apache.pig.impl.util.JarManag
 /**
  * Wrapper class which will delegate calls to parquet.pig.ParquetLoader
  */
-public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown {
+public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown, LoadPredicatePushdown {
 
     public ParquetLoader() throws FrontendException {
         this(null);
@@ -37,12 +39,12 @@ public class ParquetLoader extends LoadF
     
     public ParquetLoader(String requestedSchemaStr) throws FrontendException {
         try {
-            init(new parquet.pig.ParquetLoader(requestedSchemaStr));
+            init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr));
         }
         // if compile time dependency not found at runtime
         catch (NoClassDefFoundError e) {
             throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
-                    getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e);
+                    getClass().getName(), "org.apache.parquet.ParquetLoader"), 2259, e);
         }
     }
     
@@ -52,7 +54,7 @@ public class ParquetLoader extends LoadF
     
     @Override
     public void setLocation(String location, Job job) throws IOException {
-        JarManager.addDependencyJars(job, parquet.Version.class);
+        JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
         super.setLocation(location, job);
     }
 
@@ -66,5 +68,19 @@ public class ParquetLoader extends LoadF
             throws FrontendException {
         return ((LoadPushDown)super.loadFunc()).pushProjection(requiredFieldList);
     }
-    
+
+    @Override
+    public List<String> getPredicateFields(String location, Job job) throws IOException {
+        return ((LoadPredicatePushdown)super.loadFunc()).getPredicateFields(location, job);
+    }
+
+    @Override
+    public List<Expression.OpType> getSupportedExpressionTypes() {
+        return ((LoadPredicatePushdown)super.loadFunc()).getSupportedExpressionTypes();
+    }
+
+    @Override
+    public void setPushdownPredicate(Expression predicate) throws IOException {
+        ((LoadPredicatePushdown)super.loadFunc()).setPushdownPredicate(predicate);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/ParquetStorer.java Fri Apr 13 22:01:58 2018
@@ -31,12 +31,12 @@ public class ParquetStorer extends Store
 
     public ParquetStorer() throws FrontendException {
         try {
-            init(new parquet.pig.ParquetStorer());
+            init(new org.apache.parquet.pig.ParquetStorer());
         }
         // if compile time dependency not found at runtime
         catch (NoClassDefFoundError e) {
             throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
-                    getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e);
+                    getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e);
         }
     }
     
@@ -49,7 +49,7 @@ public class ParquetStorer extends Store
      */
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
-        JarManager.addDependencyJars(job, parquet.Version.class);
+        JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
         super.setStoreLocation(location, job);
     }
     

Modified: pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java?rev=1829112&r1=1829111&r2=1829112&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSplitCombine.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Fri Apr 13 22:01:58 2018
@@ -41,8 +41,8 @@ import org.apache.pig.impl.plan.Operator
 import org.junit.Before;
 import org.junit.Test;
 
-import parquet.hadoop.ParquetInputSplit;
-import parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
 
 public class TestSplitCombine {
     private Configuration conf;
@@ -527,7 +527,7 @@ public class TestSplitCombine {
         // first split is parquetinputsplit
         rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100,
                 new String[] { "l1", "l2", "l3" },
-                new ArrayList<BlockMetaData>(), "", "",
+                new ArrayList<BlockMetaData>(), "message dummy {}", "",
                 new HashMap<String, String>(), new HashMap<String, String>()));
         // second split is file split
         rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] {
@@ -559,7 +559,7 @@ public class TestSplitCombine {
             Assert.assertEquals(500, anotherSplit.getLength());
 
             Assert.assertEquals(2, anotherSplit.getNumPaths());
-            Assert.assertEquals("parquet.hadoop.ParquetInputSplit",
+            Assert.assertEquals("org.apache.parquet.hadoop.ParquetInputSplit",
                     (anotherSplit.getWrappedSplit(0).getClass().getName()));
             Assert.assertEquals(
                     "org.apache.hadoop.mapreduce.lib.input.FileSplit",