You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/08/04 23:35:47 UTC

svn commit: r1615805 [1/2] - in /pig/trunk: ./ ivy/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/optimiz...

Author: daijy
Date: Mon Aug  4 21:35:47 2014
New Revision: 1615805

URL: http://svn.apache.org/r1615805
Log:
PIG-4091: Predicate pushdown for ORC

Added:
    pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java
    pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
    pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
    pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/trunk/src/org/apache/pig/Expression.java
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
    pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java
    pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
    pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug  4 21:35:47 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4091: Predicate pushdown for ORC (rohini via daijy)
+
 PIG-4077: Some fixes and e2e test for OrcStorage (rohini)
 
 PIG-4054: Do not create job.jar when submitting job (daijy)

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Aug  4 21:35:47 2014
@@ -420,6 +420,8 @@
       conf="hadoop20->master" />
     <dependency org="org.iq80.snappy" name="snappy" rev="${snappy.version}"
       conf="test->master" />
+    <dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
+      conf="test->master" />
 
     <dependency org="org.vafer" name="jdeb" rev="${jdeb.version}"
       conf="compile->master">

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Aug  4 21:35:47 2014
@@ -67,6 +67,7 @@ json-simple.version=1.1
 junit.version=4.11
 jruby.version=1.6.7
 jython.version=2.5.3
+kryo.version=2.22
 rhino.version=1.7R2
 antlr.version=3.4
 stringtemplate.version=4.0.4

Modified: pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Mon Aug  4 21:35:47 2014
@@ -43,7 +43,7 @@ public class MiniCluster extends MiniGen
     }
 
     @Override
-    protected ExecType getExecType() {
+    public ExecType getExecType() {
         return ExecType.MAPREDUCE;
     }
 

Modified: pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java Mon Aug  4 21:35:47 2014
@@ -29,7 +29,7 @@ public class TezMiniCluster extends Mini
     }
 
     @Override
-    protected ExecType getExecType() {
+    public ExecType getExecType() {
         throw new UnsupportedOperationException();
     }
 

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Mon Aug  4 21:35:47 2014
@@ -54,7 +54,7 @@ public class MiniCluster extends MiniGen
     }
 
     @Override
-    protected ExecType getExecType() {
+    public ExecType getExecType() {
         return ExecType.MAPREDUCE;
     }
 

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Mon Aug  4 21:35:47 2014
@@ -54,7 +54,7 @@ public class TezMiniCluster extends Mini
     private Configuration m_mr_conf = null;
 
     @Override
-    protected ExecType getExecType() {
+    public ExecType getExecType() {
         return TEZ;
     }
 

Modified: pig/trunk/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Expression.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Expression.java (original)
+++ pig/trunk/src/org/apache/pig/Expression.java Mon Aug  4 21:35:47 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.pig;
 
+import java.util.List;
+
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
@@ -47,6 +49,14 @@ public abstract class Expression {
         OP_LE(" <= "),
         OP_MATCH(" matches "),
 
+        //Only used by PredicatePushdown, not used by PartitionPushdown
+        OP_IN (" in "),
+        OP_BETWEEN (" between "),
+
+        //unary ops
+        OP_NULL(" is null"),
+        OP_NOT(" not"),
+
         //binary logical
         OP_AND(" and "),
         OP_OR(" or "),
@@ -76,8 +86,86 @@ public abstract class Expression {
         return opType;
     }
 
+    //TODO: Apply a optimizer to Expression from PredicatePushdownOptimizer and
+    // convert OR clauses to BETWEEN OR IN
+    public static class BetweenExpression extends Expression {
+
+        private Object lower;
+        private Object upper;
+
+        public BetweenExpression(Object lower, Object upper) {
+            this.opType = OpType.OP_BETWEEN;
+            this.lower = lower;
+            this.upper = upper;
+        }
+
+        public Object getLower() {
+            return lower;
+        }
+
+        public Object getUpper() {
+            return upper;
+        }
+
+        @Override
+        public String toString() {
+            return " between " + lower + " and " + upper;
+        }
+
+    }
+
+    public static class InExpression extends Expression {
+
+        private List<Object> values;
+
+        public InExpression(List<Object> values) {
+            this.opType = OpType.OP_IN;
+            this.values = values;
+        }
+
+        public List<Object> getValues() {
+            return values;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(" in (");
+            for (Object value : values) {
+                if (value instanceof String) {
+                    sb.append("'").append(value).append("', ");
+                } else {
+                    sb.append(value).append(", ");
+                }
+            }
+            sb.deleteCharAt(sb.length() - 1);
+            sb.append(")");
+            return sb.toString();
+        }
+
+    }
+
+    public static class UnaryExpression extends Expression {
 
+        Expression expr;
 
+        public UnaryExpression(Expression expr, OpType opType) {
+            this.opType = opType;
+            this.expr = expr;
+        }
+
+        public Expression getExpression() {
+            return expr;
+        }
+
+        @Override
+        public String toString() {
+            // TODO: Change toString() for OP_NOT to say (col is not null)
+            // instead of ((col is null) not). If any one relies on expr.toString() might be useful
+            return "(" + expr.toString() + opType.toString() + ")";
+        }
+
+    }
 
     public static class BinaryExpression extends Expression {
 

Added: pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java (added)
+++ pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java Mon Aug  4 21:35:47 2014
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This interface defines how a loader can support predicate pushdown.
+ * If a given loader implements this interface, pig will pushdown predicates based on
+ * type of operations supported by the loader on given set of fields.
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface LoadPredicatePushdown {
+    /**
+     * Find what fields of the data can support predicate pushdown.
+     * @param location Location as returned by
+     * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
+     *
+     * @param job The {@link Job} object - this should be used only to obtain
+     * cluster properties through {@link Job#getConfiguration()} and not to set/query
+     * any runtime job information.
+     *
+     * @return list of field names that can be pushed down. Implementations
+     * should return null to indicate that there are no fields that support predicate pushdown
+     *
+     * @throws IOException if an exception occurs while retrieving predicate fields
+     */
+    List<String> getPredicateFields(String location, Job job)
+            throws IOException;
+
+    /**
+     * Indicate operations on fields supported by the loader for predicate pushdown
+     *
+     * @return List of operations supported by the predicate pushdown loader
+     */
+    List<Expression.OpType> getSupportedExpressionTypes();
+
+    /**
+     * Push down expression to the loader
+     *
+     * @param predicate expression to be filtered by the loader.
+     * @throws IOException
+     */
+    void setPushdownPredicate(Expression predicate) throws IOException;
+
+}
+

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Aug  4 21:35:47 2014
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
-import java.io.Reader;
 import java.io.StringReader;
 import java.io.Writer;
 import java.text.ParseException;
@@ -872,7 +871,8 @@ public class Main {
             System.out.println("            ColumnMapKeyPrune - Remove unused data");
             System.out.println("            AddForEach - Add ForEach to remove unneeded columns");
             System.out.println("            MergeForEach - Merge adjacent ForEach");
-            System.out.println("            GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
+            System.out.println("            PartitionFilterOptimizer - Pushdown partition filter conditions to loader implementing LoadMetaData");
+            System.out.println("            PredicatePushdownOptimizer - Pushdown filter predicates to loader implementing LoadPredicatePushDown");
             System.out.println("            All - Disable all optimizations");
             System.out.println("        All optimizations listed here are enabled by default. Optimization values are case insensitive.");
             System.out.println("    -v, -verbose - Print all error messages to screen");

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Mon Aug  4 21:35:47 2014
@@ -18,10 +18,13 @@
 package org.apache.pig.builtin;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -34,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -43,7 +47,10 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -56,13 +63,21 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.BetweenExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.Expression.Const;
+import org.apache.pig.Expression.InExpression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.Expression.UnaryExpression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.Expression.BinaryExpression;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
@@ -75,6 +90,9 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.orc.OrcUtils;
+import org.joda.time.DateTime;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A load function and store function for ORC file.
@@ -92,7 +110,11 @@ import org.apache.pig.impl.util.orc.OrcU
  * <li><code>-v, --version</code> Sets the version of the file that will be written
  * </ul>
  **/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+
+    //TODO Make OrcInputFormat.SARG_PUSHDOWN visible
+    private static final String SARG_PUSHDOWN = "sarg.pushdown";
+
     protected RecordReader in = null;
     protected RecordWriter writer = null;
     private TypeInfo typeInfo = null;
@@ -114,6 +136,7 @@ public class OrcStorage extends LoadFunc
 
     private static final String SchemaSignatureSuffix = "_schema";
     private static final String RequiredColumnsSuffix = "_columns";
+    private static final String SearchArgsSuffix = "_sarg";
 
     static {
         validOptions = new Options();
@@ -267,28 +290,67 @@ public class OrcStorage extends LoadFunc
         if (typeInfo != null && oi == null) {
             oi = OrcStruct.createObjectInspector(typeInfo);
         }
-        if (!UDFContext.getUDFContext().isFrontend() &&
-                p.getProperty(signature+RequiredColumnsSuffix)!=null) {
-            mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature+RequiredColumnsSuffix));
-            job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-            job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
-                    getReqiredColumnIdString(mRequiredColumns));
+        if (!UDFContext.getUDFContext().isFrontend()) {
+            if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
+                mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
+                        .getProperty(signature + RequiredColumnsSuffix));
+                job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+                job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
+                        getReqiredColumnIdString(mRequiredColumns));
+                if (p.getProperty(signature + SearchArgsSuffix) != null) {
+                    // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
+                    job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
+                            getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
+                }
+            } else if (p.getProperty(signature + SearchArgsSuffix) != null) {
+                // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
+                job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
+                        getReqiredColumnNamesString(getSchema(location, job)));
+            }
+            if (p.getProperty(signature + SearchArgsSuffix) != null) {
+                job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
+            }
+
         }
         FileInputFormat.setInputPaths(job, location);
     }
 
-    private static String getReqiredColumnIdString(boolean[] requiredColumns) {
-        String result = "";
-        for (int i=0;i<requiredColumns.length;i++) {
+    private String getReqiredColumnIdString(boolean[] requiredColumns) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < requiredColumns.length; i++) {
+            if (requiredColumns[i]) {
+                sb.append(i).append(",");
+            }
+        }
+        if (sb.charAt(sb.length() - 1) == ',') {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    private String getReqiredColumnNamesString(ResourceSchema schema) {
+        StringBuilder sb = new StringBuilder();
+        for (ResourceFieldSchema field : schema.getFields()) {
+            sb.append(field.getName()).append(",");
+        }
+        if(sb.charAt(sb.length() -1) == ',') {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    private String getReqiredColumnNamesString(ResourceSchema schema, boolean[] requiredColumns) {
+        StringBuilder sb = new StringBuilder();
+        ResourceFieldSchema[] fields = schema.getFields();
+        for (int i = 0; i < requiredColumns.length; i++) {
             if (requiredColumns[i]) {
-                result += i;
-                result += ",";
+                sb.append(fields[i]).append(",");
             }
         }
-        if(result.endsWith(",")) {
-            result = result.substring(0, result.length()-1);
+        if(sb.charAt(sb.length() - 1) == ',') {
+            sb.deleteCharAt(sb.length() - 1);
         }
-        return result;
+        return sb.toString();
     }
 
     @Override
@@ -342,19 +404,6 @@ public class OrcStorage extends LoadFunc
         return p;
     }
 
-    private static TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
-        Path path = getFirstFile(location, fs);
-        if (path == null) {
-            log.info("Cannot find any ORC files from " + location +
-                    ". Probably multiple load store in script.");
-            return null;
-        }
-        Reader reader = OrcFile.createReader(fs, path);
-        ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
-        return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
-    }
-
     @Override
     public ResourceSchema getSchema(String location, Job job)
             throws IOException {
@@ -372,11 +421,8 @@ public class OrcStorage extends LoadFunc
 
     private TypeInfo getTypeInfo(String location, Job job) throws IOException {
         Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
-        String serializedStr = p.getProperty(signature + SchemaSignatureSuffix);
-        TypeInfo typeInfo = null;
-        if (serializedStr != null) {
-            typeInfo = (TypeInfo) ObjectSerializer.deserialize(serializedStr);
-        } else {
+        TypeInfo typeInfo = (TypeInfo) ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
+        if (typeInfo == null) {
             typeInfo = getTypeInfoFromLocation(location, job);
         }
         if (typeInfo != null) {
@@ -385,6 +431,19 @@ public class OrcStorage extends LoadFunc
         return typeInfo;
     }
 
+    private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
+        FileSystem fs = FileSystem.get(job.getConfiguration());
+        Path path = getFirstFile(location, fs);
+        if (path == null) {
+            log.info("Cannot find any ORC files from " + location +
+                    ". Probably multiple load store in script.");
+            return null;
+        }
+        Reader reader = OrcFile.createReader(fs, path);
+        ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
+        return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
+    }
+
     @Override
     public ResourceStatistics getStatistics(String location, Job job)
             throws IOException {
@@ -431,4 +490,199 @@ public class OrcStorage extends LoadFunc
         return new RequiredFieldResponse(true);
     }
 
+    @Override
+    public List<String> getPredicateFields(String location, Job job) throws IOException {
+        ResourceSchema schema = getSchema(location, job);
+        List<String> predicateFields = new ArrayList<String>();
+        for (ResourceFieldSchema field : schema.getFields()) {
+            switch(field.getType()) {
+            case DataType.BOOLEAN:
+                //TODO: Need to find what to set for boolean. Throws error if SearchArgument value is set as boolean
+                break;
+            case DataType.INTEGER:
+            case DataType.LONG:
+            case DataType.FLOAT:
+            case DataType.DOUBLE:
+            case DataType.DATETIME:
+            case DataType.BYTEARRAY:
+            case DataType.CHARARRAY:
+            case DataType.BIGINTEGER:
+            case DataType.BIGDECIMAL:
+                predicateFields.add(field.getName());
+                break;
+            default:
+                // Skip DataType.TUPLE, DataType.MAP and DataType.BAG
+                break;
+            }
+        }
+        return predicateFields;
+    }
+
+    @Override
+    public List<OpType> getSupportedExpressionTypes() {
+        List<OpType> types = new ArrayList<OpType>();
+        types.add(OpType.OP_EQ);
+        types.add(OpType.OP_NE);
+        types.add(OpType.OP_GT);
+        types.add(OpType.OP_GE);
+        types.add(OpType.OP_LT);
+        types.add(OpType.OP_LE);
+        types.add(OpType.OP_IN);
+        types.add(OpType.OP_BETWEEN);
+        types.add(OpType.OP_NULL);
+        types.add(OpType.OP_NOT);
+        types.add(OpType.OP_AND);
+        types.add(OpType.OP_OR);
+        return types;
+    }
+
+    @Override
+    public void setPushdownPredicate(Expression expr) throws IOException {
+        SearchArgument sArg = getSearchArgument(expr);
+        if (sArg != null) {
+            log.info("Pushdown predicate expression is " + expr);
+            log.info("Pushdown predicate SearchArgument is:\n" + sArg);
+            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
+            try {
+                p.setProperty(signature + SearchArgsSuffix, sArg.toKryo());
+            } catch (Exception e) {
+                throw new IOException("Cannot serialize SearchArgument: " + sArg);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    SearchArgument getSearchArgument(Expression expr) {
+        if (expr == null) {
+            return null;
+        }
+        Builder builder = SearchArgument.FACTORY.newBuilder();
+        boolean beginWithAnd = !(expr.getOpType().equals(OpType.OP_AND) || expr.getOpType().equals(OpType.OP_OR) || expr.getOpType().equals(OpType.OP_NOT));
+        if (beginWithAnd) {
+            builder.startAnd();
+        }
+        buildSearchArgument(expr, builder);
+        if (beginWithAnd) {
+            builder.end();
+        }
+        SearchArgument sArg = builder.build();
+        return sArg;
+    }
+
+    private void buildSearchArgument(Expression expr, Builder builder) {
+        if (expr instanceof BinaryExpression) {
+            Expression lhs = ((BinaryExpression) expr).getLhs();
+            Expression rhs = ((BinaryExpression) expr).getRhs();
+            switch (expr.getOpType()) {
+            case OP_AND:
+                builder.startAnd();
+                buildSearchArgument(lhs, builder);
+                buildSearchArgument(rhs, builder);
+                builder.end();
+                break;
+            case OP_OR:
+                builder.startOr();
+                buildSearchArgument(lhs, builder);
+                buildSearchArgument(rhs, builder);
+                builder.end();
+                break;
+            case OP_EQ:
+                builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+                break;
+            case OP_NE:
+                builder.startNot();
+                builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+                builder.end();
+                break;
+            case OP_LT:
+                builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+                break;
+            case OP_LE:
+                builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+                break;
+            case OP_GT:
+                builder.startNot();
+                builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+                builder.end();
+                break;
+            case OP_GE:
+                builder.startNot();
+                builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+                builder.end();
+                break;
+            case OP_BETWEEN:
+                BetweenExpression between = (BetweenExpression) rhs;
+                builder.between(getColumnName(lhs), getSearchArgObjValue(between.getLower()),  getSearchArgObjValue(between.getUpper()));
+            case OP_IN:
+                InExpression in = (InExpression) rhs;
+                builder.in(getColumnName(lhs), getSearchArgObjValues(in.getValues()).toArray());
+            default:
+                throw new RuntimeException("Unsupported binary expression type: " + expr.getOpType() + " in " + expr);
+            }
+        } else if (expr instanceof UnaryExpression) {
+            Expression unaryExpr = ((UnaryExpression) expr).getExpression();
+            switch (expr.getOpType()) {
+            case OP_NULL:
+                builder.isNull(getColumnName(unaryExpr));
+                break;
+            case OP_NOT:
+                builder.startNot();
+                buildSearchArgument(unaryExpr, builder);
+                builder.end();
+                break;
+            default:
+                throw new RuntimeException("Unsupported unary expression type: " +
+                        expr.getOpType() + " in " + expr);
+            }
+        } else {
+            throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
+        }
+    }
+
+    private String getColumnName(Expression expr) {
+        try {
+            return ((Column) expr).getName();
+        } catch (ClassCastException e) {
+            throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
+                    " in expression " + expr, e);
+        }
+    }
+
+    private Object getExpressionValue(Expression expr) {
+        switch(expr.getOpType()) {
+        case TERM_COL:
+            return ((Column) expr).getName();
+        case TERM_CONST:
+            return getSearchArgObjValue(((Const) expr).getValue());
+        default:
+            throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
+        }
+    }
+
+    private List<Object> getSearchArgObjValues(List<Object> values) {
+        if (!(values.get(0) instanceof BigInteger || values.get(0) instanceof BigDecimal || values.get(0) instanceof DateTime)) {
+            return values;
+        }
+        List<Object> newValues = new ArrayList<Object>(values.size());
+        for (Object value : values) {
+            newValues.add(getSearchArgObjValue(value));
+        }
+        return values;
+    }
+
+    private Object getSearchArgObjValue(Object value) {
+           // TODO Test BigInteger, BigInteger and DateTime
+        if (value instanceof BigInteger) {
+            return HiveDecimal.create(((BigInteger)value));
+        } else if (value instanceof BigDecimal) {
+            return HiveDecimal.create(((BigDecimal)value), false);
+        } else if (value instanceof DateTime) {
+            //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
+            return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
+        } else {
+            //TODO compare to Orc schema and change type for varchar, typecast for byte, short
+            return value;
+        }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java Mon Aug  4 21:35:47 2014
@@ -18,7 +18,6 @@
 
 package org.apache.pig.newplan;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -29,11 +28,13 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.newplan.logical.expression.AddExpression;
 import org.apache.pig.newplan.logical.expression.AndExpression;
 import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
 import org.apache.pig.newplan.logical.expression.DivideExpression;
 import org.apache.pig.newplan.logical.expression.EqualExpression;
 import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
 import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
 import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
 import org.apache.pig.newplan.logical.expression.LessThanExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
@@ -41,29 +42,16 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.ModExpression;
 import org.apache.pig.newplan.logical.expression.MultiplyExpression;
 import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
 import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.expression.RegexExpression;
 import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
 
-/**
- * This is a rewrite of {@code PColFilterExtractor}
- *
- * We traverse the expression plan bottom up and separate it into two plans
- * - pushdownExprPlan, plan that can be pushed down to the loader and
- * - filterExprPlan, remaining plan that needs to be evaluated by pig
- *
- */
-public class FilterExtractor {
-
-    private static final Log LOG = LogFactory.getLog(FilterExtractor.class);
-
-    /**
-     * partition columns associated with the table
-     * present in the load on which the filter whose
-     * inner plan is being visited is applied
-     */
-    private List<String> partitionCols;
+public abstract class FilterExtractor {
+
+    protected final Log LOG = LogFactory.getLog(getClass());
 
     /**
      * We visit this plan to create the filteredPlan
@@ -83,12 +71,12 @@ public class FilterExtractor {
     /**
      * Final filterExpr after we are done
      */
-    private LogicalExpression filterExpr = null;
+    protected LogicalExpression filterExpr = null;
 
     /**
      * @{code Expression} to pushdown
      */
-    private Expression pushdownExpr = null;
+    protected Expression pushdownExpr = null;
 
     /**
      *
@@ -96,10 +84,8 @@ public class FilterExtractor {
      * @param partitionCols list of partition columns of the table which is
      * being loaded in the LOAD statement which is input to the filter
      */
-    public FilterExtractor(LogicalExpressionPlan plan,
-            List<String> partitionCols) {
+    public FilterExtractor(LogicalExpressionPlan plan) {
         this.originalPlan = plan;
-        this.partitionCols = new ArrayList<String>(partitionCols);
         this.filteredPlan = new LogicalExpressionPlan();
         this.pushdownExprPlan = new LogicalExpressionPlan();
     }
@@ -141,20 +127,19 @@ public class FilterExtractor {
     }
 
     /**
-     * @return the condition on partition columns extracted from filter
+     * @return the push condition from the filter
      */
-    public  Expression getPColCondition(){
+    public  Expression getPushDownExpression(){
         return pushdownExpr;
     }
 
-    private class KeyState {
+    protected class KeyState {
         LogicalExpression pushdownExpr;
         LogicalExpression filterExpr;
     }
 
-    private KeyState checkPushDown(LogicalExpression op) throws FrontendException {
-        // Note: Currently, Expression interface only understands 3 Expression Types
-        // (Look at getExpression below) BinaryExpression, ProjectExpression and ConstantExpression
+    protected KeyState checkPushDown(LogicalExpression op) throws FrontendException {
+        // Note: Currently, Expression interface only understands following Expression Types
         if(op instanceof ProjectExpression) {
             return checkPushDown((ProjectExpression)op);
         } else if (op instanceof BinaryExpression) {
@@ -165,6 +150,10 @@ public class FilterExtractor {
             state.pushdownExpr = op;
             state.filterExpr = null;
             return state;
+        } else if(op instanceof CastExpression) {
+            return checkPushDown(((CastExpression)op).getExpression());
+        } else if (op instanceof UnaryExpression) {
+            return checkPushDown((UnaryExpression) op);
         } else {
             KeyState state = new KeyState();
             state.pushdownExpr = null;
@@ -173,7 +162,7 @@ public class FilterExtractor {
         }
     }
 
-    private LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
+    protected LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
         // This copies the whole tree underneath op
         LogicalExpression newOp = op.deepCopy(filteredPlan);
         return newOp;
@@ -213,8 +202,14 @@ public class FilterExtractor {
         return orOp;
     }
 
-    private KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
+    protected KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
         KeyState state = new KeyState();
+
+        if (!isSupportedOpType(binExpr)) {
+            state.filterExpr = addToFilterPlan(binExpr);
+            state.pushdownExpr = null;
+            return state;
+        }
         KeyState leftState = checkPushDown(binExpr.getLhs());
         KeyState rightState = checkPushDown(binExpr.getRhs());
 
@@ -273,19 +268,32 @@ public class FilterExtractor {
         return state;
     }
 
-    private KeyState checkPushDown(ProjectExpression project) throws FrontendException {
-        String fieldName = project.getFieldSchema().alias;
+    protected KeyState checkPushDown(UnaryExpression unaryExpr) throws FrontendException {
         KeyState state = new KeyState();
-        if(partitionCols.contains(fieldName)) {
-            state.filterExpr = null;
-            state.pushdownExpr = project;
+        if (isSupportedOpType(unaryExpr)) {
+            if (unaryExpr instanceof IsNullExpression) {
+                state.pushdownExpr = unaryExpr;
+                state.filterExpr = null;
+            } else if (unaryExpr instanceof NotExpression) {
+                state.pushdownExpr = unaryExpr;
+                state.filterExpr = null;
+            } else {
+                state.filterExpr = addToFilterPlan(unaryExpr);
+                state.pushdownExpr = null;
+            }
         } else {
-            state.filterExpr = addToFilterPlan(project);
+            state.filterExpr = addToFilterPlan(unaryExpr);
             state.pushdownExpr = null;
         }
         return state;
     }
 
+    protected abstract KeyState checkPushDown(ProjectExpression project) throws FrontendException;
+
+    protected abstract boolean isSupportedOpType(BinaryExpression binOp);
+
+    protected abstract boolean isSupportedOpType(UnaryExpression unaryOp);
+
     /**
      * Assume that the given operator is already disconnected from its predecessors.
      * @param op
@@ -311,7 +319,7 @@ public class FilterExtractor {
         filteredPlan.remove( op );
     }
 
-    public static Expression getExpression(LogicalExpression op) throws FrontendException
+    public Expression getExpression(LogicalExpression op) throws FrontendException
     {
         if(op == null) {
             return null;
@@ -323,11 +331,7 @@ public class FilterExtractor {
             ProjectExpression projExpr = (ProjectExpression)op;
             String fieldName = projExpr.getFieldSchema().alias;
             return new Expression.Column(fieldName);
-        } else {
-            if( !( op instanceof BinaryExpression ) ) {
-                LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
-                throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
-            }
+        } else if(op instanceof BinaryExpression) {
             BinaryExpression binOp = (BinaryExpression)op;
             if(binOp instanceof AddExpression) {
                 return getExpression( binOp, OpType.OP_PLUS );
@@ -358,15 +362,35 @@ public class FilterExtractor {
             } else if(binOp instanceof RegexExpression) {
                 return getExpression(binOp, OpType.OP_MATCH);
             } else {
-                LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
-                throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+                LOG.error("Unsupported conversion of BinaryExpression to Expression: " + op.getName());
+                throw new FrontendException("Unsupported conversion of BinaryExpression to Expression: " + op.getName());
             }
+        } else if(op instanceof UnaryExpression) {
+            UnaryExpression unaryOp = (UnaryExpression)op;
+            if(unaryOp instanceof IsNullExpression) {
+                return getExpression(unaryOp, OpType.OP_NULL);
+            } else if(unaryOp instanceof NotExpression) {
+                return getExpression(unaryOp, OpType.OP_NOT);
+            } else if(unaryOp instanceof CastExpression) {
+                return getExpression(unaryOp.getExpression());
+            } else {
+                LOG.error("Unsupported conversion of UnaryExpression to Expression: " + op.getName());
+                throw new FrontendException("Unsupported conversion of UnaryExpression to Expression: " + op.getName());
+            }
+        } else {
+            LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+            throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
         }
     }
 
-    private static Expression getExpression(BinaryExpression binOp, OpType
+    protected Expression getExpression(BinaryExpression binOp, OpType
             opType) throws FrontendException {
         return new Expression.BinaryExpression(getExpression(binOp.getLhs())
                 , getExpression(binOp.getRhs()), opType);
     }
+
+    protected Expression getExpression(UnaryExpression unaryOp, OpType
+            opType) throws FrontendException {
+        return new Expression.UnaryExpression(getExpression(unaryOp.getExpression()), opType);
+    }
 }

Added: pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java Mon Aug  4 21:35:47 2014
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
+
+/**
+ * This is a rewrite of {@code PColFilterExtractor}
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ */
+public class PartitionFilterExtractor extends FilterExtractor {
+
+    /**
+     * partition columns associated with the table
+     * present in the load on which the filter whose
+     * inner plan is being visited is applied
+     */
+    private List<String> partitionCols;
+
+
+    /**
+     * @param plan logical plan corresponding the filter's comparison condition
+     * @param partitionCols list of partition columns of the table which is
+     * being loaded in the LOAD statement which is input to the filter
+     */
+    public PartitionFilterExtractor(LogicalExpressionPlan plan,
+            List<String> partitionCols) {
+        super(plan);
+        this.partitionCols = new ArrayList<String>(partitionCols);
+    }
+
+    @Override
+    protected KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+        String fieldName = project.getFieldSchema().alias;
+        KeyState state = new KeyState();
+        if(partitionCols.contains(fieldName)) {
+            state.filterExpr = null;
+            state.pushdownExpr = project;
+        } else {
+            state.filterExpr = addToFilterPlan(project);
+            state.pushdownExpr = null;
+        }
+        return state;
+    }
+
+    @Override
+    protected boolean isSupportedOpType(BinaryExpression binOp) {
+        return true;
+    }
+
+    @Override
+    protected boolean isSupportedOpType(UnaryExpression unaryOp) {
+        return false;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java Mon Aug  4 21:35:47 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.List;
+
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
+
+/**
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ * If the predicate is not removable then filterExprPlan will not have
+ * the pushdownExprPlan removed.
+ */
+public class PredicatePushDownFilterExtractor extends FilterExtractor {
+
+    private List<String> predicateCols;
+    private List<OpType> supportedOpTypes;
+
+    public PredicatePushDownFilterExtractor(LogicalExpressionPlan plan, List<String> predicateCols,
+            List<OpType> supportedOpTypes) {
+        super(plan);
+        this.predicateCols = predicateCols;
+        this.supportedOpTypes = supportedOpTypes;
+    }
+
+    @Override
+    public void visit() throws FrontendException {
+        super.visit();
+        if (supportedOpTypes.contains(OpType.OP_BETWEEN)) {
+            // TODO: Collapse multiple ORs into BETWEEN
+        } else if (supportedOpTypes.contains(OpType.OP_IN)) {
+            // TODO: Collapse multiple ORs into IN
+        }
+    }
+
+    @Override
+    protected KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+        String fieldName = project.getFieldSchema().alias;
+        KeyState state = new KeyState();
+        if(predicateCols.contains(fieldName)) {
+            state.filterExpr = null;
+            state.pushdownExpr = project;
+        } else {
+            state.filterExpr = addToFilterPlan(project);
+            state.pushdownExpr = null;
+        }
+        return state;
+    }
+
+    @Override
+    protected boolean isSupportedOpType(BinaryExpression binOp) {
+        if(binOp instanceof AddExpression) {
+            return supportedOpTypes.contains(OpType.OP_PLUS );
+        } else if(binOp instanceof SubtractExpression) {
+            return supportedOpTypes.contains(OpType.OP_MINUS);
+        } else if(binOp instanceof MultiplyExpression) {
+            return supportedOpTypes.contains(OpType.OP_TIMES);
+        } else if(binOp instanceof DivideExpression) {
+            return supportedOpTypes.contains(OpType.OP_DIV);
+        } else if(binOp instanceof ModExpression) {
+            return supportedOpTypes.contains(OpType.OP_MOD);
+        } else if(binOp instanceof AndExpression) {
+            return supportedOpTypes.contains(OpType.OP_AND);
+        } else if(binOp instanceof OrExpression) {
+            return supportedOpTypes.contains(OpType.OP_OR);
+        } else if(binOp instanceof EqualExpression) {
+            return supportedOpTypes.contains(OpType.OP_EQ);
+        } else if(binOp instanceof NotEqualExpression) {
+            return supportedOpTypes.contains(OpType.OP_NE);
+        } else if(binOp instanceof GreaterThanExpression) {
+            return supportedOpTypes.contains(OpType.OP_GT);
+        } else if(binOp instanceof GreaterThanEqualExpression) {
+            return supportedOpTypes.contains(OpType.OP_GE);
+        } else if(binOp instanceof LessThanExpression) {
+            return supportedOpTypes.contains(OpType.OP_LT);
+        } else if(binOp instanceof LessThanEqualExpression) {
+            return supportedOpTypes.contains(OpType.OP_LE);
+        } else if(binOp instanceof RegexExpression) {
+            return supportedOpTypes.contains(OpType.OP_MATCH);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected boolean isSupportedOpType(UnaryExpression unaryOp) {
+        if(unaryOp instanceof IsNullExpression) {
+            return supportedOpTypes.contains(OpType.OP_NULL);
+        } else if(unaryOp instanceof NotExpression) {
+            return supportedOpTypes.contains(OpType.OP_NOT);
+        } else {
+            return false;
+        }
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Mon Aug  4 21:35:47 2014
@@ -39,6 +39,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.MergeForEach;
 import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer;
 import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
@@ -132,6 +133,15 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
 
+        // Predicate pushdown set
+        // This set of rules push filter conditions to LoadFunc
+        s = new HashSet<Rule>();
+        // Optimize partition filter
+        r = new PredicatePushdownOptimizer("PredicatePushdownOptimizer");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
+
         // PushDownForEachFlatten set
         s = new HashSet<Rule>();
         // Add the PushDownForEachFlatten

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Mon Aug  4 21:35:47 2014
@@ -35,7 +35,7 @@ import org.apache.pig.newplan.FilterExtr
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.PartitionFilterExtractor;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -46,9 +46,9 @@ import org.apache.pig.newplan.optimizer.
 
 public class PartitionFilterOptimizer extends Rule {
     private String[] partitionKeys;
-    
+
     /**
-     * a reference to the LoadMetada implementation 
+     * a reference to the LoadMetada implementation
      */
     private LoadMetadata loadMetadata;
 
@@ -56,23 +56,23 @@ public class PartitionFilterOptimizer ex
      * a reference to the LoadFunc implementation
      */
     private LoadFunc loadFunc;
-    
+
     private LOLoad loLoad;
     private LOFilter loFilter;
-    
+
     /**
-     * a map between column names as reported in 
+     * a map between column names as reported in
      * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
-     * and as present in {@link LOLoad#getSchema()}. The two will be different 
+     * and as present in {@link LOLoad#getSchema()}. The two will be different
      * when the user has provided a schema in the load statement
      */
     private Map<String, String> colNameMap = new HashMap<String, String>();
-    
+
     /**
      * a map between column nameas as present in {@link LOLoad#getSchema()} and
-     * as reported in 
+     * as reported in
      * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
-     * The two will be different when the user has provided a schema in the 
+     * The two will be different when the user has provided a schema in the
      * load statement.
      */
     private Map<String, String> reverseColNameMap = new HashMap<String, String>();
@@ -109,17 +109,17 @@ public class PartitionFilterOptimizer ex
             if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) )
                 return false;
             loFilter =  (LOFilter)succeds.get(0);
-            
+
             // Filter has dependency other than load, skip optimization
             if (currentPlan.getSoftLinkPredecessors(loFilter)!=null)
                 return false;
-            
+
             // we have to check more only if LoadFunc implements LoadMetada
             loadFunc = loLoad.getLoadFunc();
             if(!( loadFunc instanceof LoadMetadata ) ) {
                 return false;
             }
-            
+
             loadMetadata = (LoadMetadata)loadFunc;
             try {
 				partitionKeys = loadMetadata.getPartitionKeys(
@@ -130,7 +130,7 @@ public class PartitionFilterOptimizer ex
             if( partitionKeys == null || partitionKeys.length == 0 ) {
                 return false;
             }
-            
+
             return true;
         }
 
@@ -144,11 +144,11 @@ public class PartitionFilterOptimizer ex
         	subPlan = new OperatorSubPlan( currentPlan );
 
         	setupColNameMaps();
-        	
-        	FilterExtractor filterFinder = new FilterExtractor(
-                    loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+
+            FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(),
+                    getMappedKeys(partitionKeys));
             filterFinder.visit();
-            Expression partitionFilter = filterFinder.getPColCondition();
+            Expression partitionFilter = filterFinder.getPushDownExpression();
 
             if(partitionFilter != null) {
                 // the column names in the filter may be the ones provided by
@@ -168,7 +168,7 @@ public class PartitionFilterOptimizer ex
                 }
             }
         }
-        
+
         protected void updateMappedColNames(Expression expr) {
             if(expr instanceof BinaryExpression) {
                 updateMappedColNames(((BinaryExpression) expr).getLhs());
@@ -180,7 +180,7 @@ public class PartitionFilterOptimizer ex
         }
 
         /**
-         * The partition keys in the argument are as reported by 
+         * The partition keys in the argument are as reported by
          * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
          * The user may have renamed these by providing a schema with different names
          * in the load statement - this method will replace the former names with
@@ -203,9 +203,9 @@ public class PartitionFilterOptimizer ex
                 colNameMap.put(loadFuncSchema.getField(i).alias,
                         (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
                             loadFuncSchema.getField(i).alias));
-                
+
                 reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
-                            loadFuncSchema.getField(i).alias), 
+                            loadFuncSchema.getField(i).alias),
                             loadFuncSchema.getField(i).alias);
             }
         }

Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Mon Aug  4 21:35:47 2014
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.PredicatePushDownFilterExtractor;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class PredicatePushdownOptimizer extends Rule {
+
+    public PredicatePushdownOptimizer(String name) {
+        super(name, false);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator load = new LOLoad(null, plan);
+        plan.add(load);
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new PredicatePushDownTransformer();
+    }
+
+    class PredicatePushDownTransformer extends Transformer {
+
+        private LOLoad loLoad;
+        private LOFilter loFilter;
+
+        private LoadFunc loadFunc;
+        private LoadPredicatePushdown loadPredPushdown;
+
+        private List<String> predicateFields;
+
+        /**
+         * a map between column names as reported in
+         * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
+         * and as present in {@link LOLoad#getSchema()}. The two will be different
+         * when the user has provided a schema in the load statement
+         */
+        private Map<String, String> colNameMap = new HashMap<String, String>();
+
+        /**
+         * a map between column nameas as present in {@link LOLoad#getSchema()} and
+         * as reported in
+         * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
+         * The two will be different when the user has provided a schema in the
+         * load statement.
+         */
+        private Map<String, String> reverseColNameMap = new HashMap<String, String>();
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws FrontendException {
+            loLoad = (LOLoad)matched.getSources().get(0);
+            // Match filter.
+            List<Operator> succeds = currentPlan.getSuccessors( loLoad );
+            if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) )
+                return false;
+            loFilter = (LOFilter) succeds.get(0);
+
+            // Filter has dependency other than load, skip optimization
+            if (currentPlan.getSoftLinkPredecessors(loFilter) != null)
+                return false;
+
+            // we have to check more only if LoadFunc implements LoadPredicatePushdown
+            loadFunc = loLoad.getLoadFunc();
+            if (!(loadFunc instanceof LoadPredicatePushdown)) {
+                return false;
+            }
+
+            loadPredPushdown = (LoadPredicatePushdown) loadFunc;
+            try {
+                predicateFields = loadPredPushdown.getPredicateFields(loLoad.getFileSpec()
+                        .getFileName(), new Job(loLoad.getConfiguration()));
+            } catch (IOException e) {
+                throw new FrontendException(e);
+            }
+            if (predicateFields == null || predicateFields.size() == 0) {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            // Return null in case predicate pushdown is just a hint which means the plan hasn't changed.
+            // If not return the modified plan which has filters removed.
+            return null;
+            //return subPlan; TODO: implement filter removal
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+            subPlan = new OperatorSubPlan( currentPlan );
+
+            setupColNameMaps();
+
+            PredicatePushDownFilterExtractor filterFinder = new PredicatePushDownFilterExtractor(
+                    loFilter.getFilterPlan(), getMappedKeys( predicateFields ), loadPredPushdown.getSupportedExpressionTypes() );
+            filterFinder.visit();
+            Expression pushDownPredicate = filterFinder.getPushDownExpression();
+
+            if(pushDownPredicate != null) {
+                // the column names in the filter may be the ones provided by
+                // the user in the schema in the load statement - we may need
+                // to replace them with partition column names as given by
+                // LoadFunc.getSchema()
+                updateMappedColNames(pushDownPredicate);
+                try {
+                    loadPredPushdown.setPushdownPredicate(pushDownPredicate);
+                } catch (IOException e) {
+                    throw new FrontendException( e );
+                }
+            }
+        }
+
+        protected void updateMappedColNames(Expression expr) {
+            if(expr instanceof BinaryExpression) {
+                updateMappedColNames(((BinaryExpression) expr).getLhs());
+                updateMappedColNames(((BinaryExpression) expr).getRhs());
+            } else if (expr instanceof Column) {
+                Column col = (Column) expr;
+                col.setName(reverseColNameMap.get(col.getName()));
+            }
+        }
+
+        /**
+         * The partition keys in the argument are as reported by
+         * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
+         * The user may have renamed these by providing a schema with different names
+         * in the load statement - this method will replace the former names with
+         * the latter names.
+         * @param predicateFields
+         * @return
+         */
+        protected List<String> getMappedKeys(List<String> predicateFields) {
+            List<String> mappedKeys = new ArrayList<String>(predicateFields.size());
+            for (int i = 0; i < predicateFields.size(); i++) {
+                mappedKeys.add(colNameMap.get(predicateFields.get(i)));
+            }
+            return mappedKeys;
+        }
+
+        protected void setupColNameMaps() throws FrontendException {
+            LogicalSchema loLoadSchema = loLoad.getSchema();
+            LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema();
+             for(int i = 0; i < loadFuncSchema.size(); i++) {
+                colNameMap.put(loadFuncSchema.getField(i).alias,
+                        (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+                            loadFuncSchema.getField(i).alias));
+
+                reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+                            loadFuncSchema.getField(i).alias),
+                            loadFuncSchema.getField(i).alias);
+            }
+        }
+
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java Mon Aug  4 21:35:47 2014
@@ -33,30 +33,30 @@ import org.apache.pig.newplan.OperatorPl
  * transform or until maxIter iterations (default 500) has been made over
  * the RuleSet.  Then the next RuleSet will be moved to.  Once finished,
  * a given RuleSet is never returned to.
- * 
+ *
  * Each rule is has two parts:  a pattern and and associated transformer.
  * Transformers have two important functions:   check(), and transform().
  * The pattern describes a pattern of node types that the optimizer will
  * look to match.  If that match is found anywhere in the plan, then check()
- * will be called.  check() allows the rule to look more in depth at the 
+ * will be called.  check() allows the rule to look more in depth at the
  * matched pattern and decide whether the rule should be run or not.  For
  * example, one might design a rule to push filters above join that would
  * look for the pattern filter(join) (meaning a filter followed by a join).
- * But only certain types of filters can be pushed.  The check() function 
+ * But only certain types of filters can be pushed.  The check() function
  * would need to decide whether the filter that it found was pushable or not.
  * If check() returns true, the rule is said to have matched, and transform()
  * is then called.  This function is responsible for making changes in the
  * logical plan.  Once transform is complete PlanPatcher.patchUp will be
- * called to do any necessary cleanup in the plan, such as resetting 
+ * called to do any necessary cleanup in the plan, such as resetting
  * schemas, etc.
  */
 public abstract class PlanOptimizer {
- 
+
     protected List<Set<Rule>> ruleSets;
     protected OperatorPlan plan;
     protected List<PlanTransformListener> listeners;
     protected int maxIter;
-    
+
     static final int defaultIterations = 500;
 
     /**
@@ -66,16 +66,16 @@ public abstract class PlanOptimizer {
      * set to -1 for default
      */
     protected PlanOptimizer(OperatorPlan p,
-                            List<Set<Rule>> rs,                            
+                            List<Set<Rule>> rs,
                             int iterations) {
         plan = p;
         ruleSets = rs;
         listeners = new ArrayList<PlanTransformListener>();
         maxIter = (iterations < 1 ? defaultIterations : iterations);
     }
-    
+
     /**
-     * Adds a listener to the optimization.  This listener will be fired 
+     * Adds a listener to the optimization.  This listener will be fired
      * after each rule transforms a plan.  Listeners are guaranteed to
      * be fired in the order they are added.
      * @param listener
@@ -83,13 +83,13 @@ public abstract class PlanOptimizer {
     protected void addPlanTransformListener(PlanTransformListener listener) {
         listeners.add(listener);
     }
-    
+
     /**
      * Run the optimizer.  This method attempts to match each of the Rules
      * against the plan.  If a Rule matches, it then calls the check
      * method of the associated Transformer to give the it a chance to
      * check whether it really wants to do the optimization.  If that
-     * returns true as well, then Transformer.transform is called. 
+     * returns true as well, then Transformer.transform is called.
      * @throws FrontendException
      */
     public void optimize() throws FrontendException {
@@ -108,9 +108,12 @@ public abstract class PlanOptimizer {
                                 if (transformer.check(m)) {
                                     sawMatch = true;
                                     transformer.transform(m);
-                                    if (!rule.isSkipListener()) {
+                                    OperatorPlan change = transformer.reportChanges();
+                                    if (change == null) {
+                                        sawMatch = false;
+                                    } else if (!rule.isSkipListener()) {
                                         for(PlanTransformListener l: listeners) {
-                                            l.transformed(plan, transformer.reportChanges());
+                                            l.transformed(plan, change);
                                         }
                                     }
                                 }

Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java Mon Aug  4 21:35:47 2014
@@ -65,6 +65,7 @@ import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestOrcStorage {
@@ -82,13 +83,8 @@ public class TestOrcStorage {
     private static PigServer pigServer = null;
     private static FileSystem fs;
 
-    @Before
-    public void setup() throws ExecException, IOException {
-        pigServer = new PigServer(ExecType.LOCAL);
-        fs = FileSystem.get(ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties()));
-        deleteTestFiles();
-        pigServer.mkdirs(outbasedir);
-        generateInputFiles();
+    @BeforeClass
+    public static void oneTimeSetup(){
         if(Util.WINDOWS){
             INPUT1 = INPUT1.replace("\\", "/");
             OUTPUT1 = OUTPUT1.replace("\\", "/");
@@ -98,6 +94,15 @@ public class TestOrcStorage {
         }
     }
 
+    @Before
+    public void setup() throws ExecException, IOException {
+        pigServer = new PigServer(ExecType.LOCAL);
+        fs = FileSystem.get(ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties()));
+        deleteTestFiles();
+        pigServer.mkdirs(outbasedir);
+        generateInputFiles();
+    }
+
     @After
     public void teardown() throws IOException {
         if(pigServer != null) {
@@ -234,7 +239,6 @@ public class TestOrcStorage {
         fs.delete(new Path(OUTPUT4, "_SUCCESS"), true);
 
         pigServer.registerQuery("A = load '" + OUTPUT4 + "' using OrcStorage();" );
-        Schema s = pigServer.dumpSchema("A");
         Iterator<Tuple> iter = pigServer.openIterator("A");
         Tuple t = iter.next();
         assertTrue(t.toString().startsWith("(false,1,1024,65536,9223372036854775807,1.0,-15.0," +
@@ -313,6 +317,7 @@ public class TestOrcStorage {
 
     }
 
+    @SuppressWarnings("rawtypes")
     private void compareData(Object expected, Object actual) {
         if (expected instanceof Text) {
             assertEquals(String.class, actual.getClass());