You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/08/04 23:35:47 UTC
svn commit: r1615805 [1/2] - in /pig/trunk: ./ ivy/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/
src/org/apache/pig/builtin/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/optimiz...
Author: daijy
Date: Mon Aug 4 21:35:47 2014
New Revision: 1615805
URL: http://svn.apache.org/r1615805
Log:
PIG-4091: Predicate pushdown for ORC
Added:
pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java
pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
pig/trunk/test/org/apache/pig/builtin/TestOrcStoragePushdown.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/trunk/src/org/apache/pig/Expression.java
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java
pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
pig/trunk/test/org/apache/pig/test/TestNewPartitionFilterPushDown.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug 4 21:35:47 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4091: Predicate pushdown for ORC (rohini via daijy)
+
PIG-4077: Some fixes and e2e test for OrcStorage (rohini)
PIG-4054: Do not create job.jar when submitting job (daijy)
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Aug 4 21:35:47 2014
@@ -420,6 +420,8 @@
conf="hadoop20->master" />
<dependency org="org.iq80.snappy" name="snappy" rev="${snappy.version}"
conf="test->master" />
+ <dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
+ conf="test->master" />
<dependency org="org.vafer" name="jdeb" rev="${jdeb.version}"
conf="compile->master">
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Aug 4 21:35:47 2014
@@ -67,6 +67,7 @@ json-simple.version=1.1
junit.version=4.11
jruby.version=1.6.7
jython.version=2.5.3
+kryo.version=2.22
rhino.version=1.7R2
antlr.version=3.4
stringtemplate.version=4.0.4
Modified: pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Mon Aug 4 21:35:47 2014
@@ -43,7 +43,7 @@ public class MiniCluster extends MiniGen
}
@Override
- protected ExecType getExecType() {
+ public ExecType getExecType() {
return ExecType.MAPREDUCE;
}
Modified: pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java Mon Aug 4 21:35:47 2014
@@ -29,7 +29,7 @@ public class TezMiniCluster extends Mini
}
@Override
- protected ExecType getExecType() {
+ public ExecType getExecType() {
throw new UnsupportedOperationException();
}
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Mon Aug 4 21:35:47 2014
@@ -54,7 +54,7 @@ public class MiniCluster extends MiniGen
}
@Override
- protected ExecType getExecType() {
+ public ExecType getExecType() {
return ExecType.MAPREDUCE;
}
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Mon Aug 4 21:35:47 2014
@@ -54,7 +54,7 @@ public class TezMiniCluster extends Mini
private Configuration m_mr_conf = null;
@Override
- protected ExecType getExecType() {
+ public ExecType getExecType() {
return TEZ;
}
Modified: pig/trunk/src/org/apache/pig/Expression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Expression.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Expression.java (original)
+++ pig/trunk/src/org/apache/pig/Expression.java Mon Aug 4 21:35:47 2014
@@ -17,6 +17,8 @@
*/
package org.apache.pig;
+import java.util.List;
+
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -47,6 +49,14 @@ public abstract class Expression {
OP_LE(" <= "),
OP_MATCH(" matches "),
+ //Only used by PredicatePushdown, not used by PartitionPushdown
+ OP_IN (" in "),
+ OP_BETWEEN (" between "),
+
+ //unary ops
+ OP_NULL(" is null"),
+ OP_NOT(" not"),
+
//binary logical
OP_AND(" and "),
OP_OR(" or "),
@@ -76,8 +86,86 @@ public abstract class Expression {
return opType;
}
+ //TODO: Apply a optimizer to Expression from PredicatePushdownOptimizer and
+ // convert OR clauses to BETWEEN OR IN
+ public static class BetweenExpression extends Expression {
+
+ private Object lower;
+ private Object upper;
+
+ public BetweenExpression(Object lower, Object upper) {
+ this.opType = OpType.OP_BETWEEN;
+ this.lower = lower;
+ this.upper = upper;
+ }
+
+ public Object getLower() {
+ return lower;
+ }
+
+ public Object getUpper() {
+ return upper;
+ }
+
+ @Override
+ public String toString() {
+ return " between " + lower + " and " + upper;
+ }
+
+ }
+
+ public static class InExpression extends Expression {
+
+ private List<Object> values;
+
+ public InExpression(List<Object> values) {
+ this.opType = OpType.OP_IN;
+ this.values = values;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" in (");
+ for (Object value : values) {
+ if (value instanceof String) {
+ sb.append("'").append(value).append("', ");
+ } else {
+ sb.append(value).append(", ");
+ }
+ }
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append(")");
+ return sb.toString();
+ }
+
+ }
+
+ public static class UnaryExpression extends Expression {
+ Expression expr;
+ public UnaryExpression(Expression expr, OpType opType) {
+ this.opType = opType;
+ this.expr = expr;
+ }
+
+ public Expression getExpression() {
+ return expr;
+ }
+
+ @Override
+ public String toString() {
+ // TODO: Change toString() for OP_NOT to say (col is not null)
+ // instead of ((col is null) not). If any one relies on expr.toString() might be useful
+ return "(" + expr.toString() + opType.toString() + ")";
+ }
+
+ }
public static class BinaryExpression extends Expression {
Added: pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java (added)
+++ pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java Mon Aug 4 21:35:47 2014
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This interface defines how a loader can support predicate pushdown.
+ * If a given loader implements this interface, pig will pushdown predicates based on
+ * type of operations supported by the loader on given set of fields.
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface LoadPredicatePushdown {
+ /**
+ * Find what fields of the data can support predicate pushdown.
+ * @param location Location as returned by
+ * {@link LoadFunc#relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)}
+ *
+ * @param job The {@link Job} object - this should be used only to obtain
+ * cluster properties through {@link Job#getConfiguration()} and not to set/query
+ * any runtime job information.
+ *
+ * @return list of field names that can be pushed down. Implementations
+ * should return null to indicate that there are no fields that support predicate pushdown
+ *
+ * @throws IOException if an exception occurs while retrieving predicate fields
+ */
+ List<String> getPredicateFields(String location, Job job)
+ throws IOException;
+
+ /**
+ * Indicate operations on fields supported by the loader for predicate pushdown
+ *
+ * @return List of operations supported by the predicate pushdown loader
+ */
+ List<Expression.OpType> getSupportedExpressionTypes();
+
+ /**
+ * Push down expression to the loader
+ *
+ * @param predicate expression to be filtered by the loader.
+ * @throws IOException
+ */
+ void setPushdownPredicate(Expression predicate) throws IOException;
+
+}
+
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Aug 4 21:35:47 2014
@@ -28,7 +28,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
import java.text.ParseException;
@@ -872,7 +871,8 @@ public class Main {
System.out.println(" ColumnMapKeyPrune - Remove unused data");
System.out.println(" AddForEach - Add ForEach to remove unneeded columns");
System.out.println(" MergeForEach - Merge adjacent ForEach");
- System.out.println(" GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
+ System.out.println(" PartitionFilterOptimizer - Pushdown partition filter conditions to loader implementing LoadMetaData");
+ System.out.println(" PredicatePushdownOptimizer - Pushdown filter predicates to loader implementing LoadPredicatePushDown");
System.out.println(" All - Disable all optimizations");
System.out.println(" All optimizations listed here are enabled by default. Optimization values are case insensitive.");
System.out.println(" -v, -verbose - Print all error messages to screen");
Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Mon Aug 4 21:35:47 2014
@@ -18,10 +18,13 @@
package org.apache.pig.builtin;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -34,6 +37,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -43,7 +47,10 @@ import org.apache.hadoop.hive.ql.io.orc.
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.Version;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -56,13 +63,21 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
+import org.apache.pig.Expression.BetweenExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.Expression.Const;
+import org.apache.pig.Expression.InExpression;
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.Expression.UnaryExpression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
+import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
@@ -75,6 +90,9 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.orc.OrcUtils;
+import org.joda.time.DateTime;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A load function and store function for ORC file.
@@ -92,7 +110,11 @@ import org.apache.pig.impl.util.orc.OrcU
* <li><code>-v, --version</code> Sets the version of the file that will be written
* </ul>
**/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+
+ //TODO Make OrcInputFormat.SARG_PUSHDOWN visible
+ private static final String SARG_PUSHDOWN = "sarg.pushdown";
+
protected RecordReader in = null;
protected RecordWriter writer = null;
private TypeInfo typeInfo = null;
@@ -114,6 +136,7 @@ public class OrcStorage extends LoadFunc
private static final String SchemaSignatureSuffix = "_schema";
private static final String RequiredColumnsSuffix = "_columns";
+ private static final String SearchArgsSuffix = "_sarg";
static {
validOptions = new Options();
@@ -267,28 +290,67 @@ public class OrcStorage extends LoadFunc
if (typeInfo != null && oi == null) {
oi = OrcStruct.createObjectInspector(typeInfo);
}
- if (!UDFContext.getUDFContext().isFrontend() &&
- p.getProperty(signature+RequiredColumnsSuffix)!=null) {
- mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature+RequiredColumnsSuffix));
- job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
- getReqiredColumnIdString(mRequiredColumns));
+ if (!UDFContext.getUDFContext().isFrontend()) {
+ if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
+ mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
+ .getProperty(signature + RequiredColumnsSuffix));
+ job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
+ getReqiredColumnIdString(mRequiredColumns));
+ if (p.getProperty(signature + SearchArgsSuffix) != null) {
+ // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
+ job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
+ getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
+ }
+ } else if (p.getProperty(signature + SearchArgsSuffix) != null) {
+ // Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
+ job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
+ getReqiredColumnNamesString(getSchema(location, job)));
+ }
+ if (p.getProperty(signature + SearchArgsSuffix) != null) {
+ job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
+ }
+
}
FileInputFormat.setInputPaths(job, location);
}
- private static String getReqiredColumnIdString(boolean[] requiredColumns) {
- String result = "";
- for (int i=0;i<requiredColumns.length;i++) {
+ private String getReqiredColumnIdString(boolean[] requiredColumns) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < requiredColumns.length; i++) {
+ if (requiredColumns[i]) {
+ sb.append(i).append(",");
+ }
+ }
+ if (sb.charAt(sb.length() - 1) == ',') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private String getReqiredColumnNamesString(ResourceSchema schema) {
+ StringBuilder sb = new StringBuilder();
+ for (ResourceFieldSchema field : schema.getFields()) {
+ sb.append(field.getName()).append(",");
+ }
+ if(sb.charAt(sb.length() -1) == ',') {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ private String getReqiredColumnNamesString(ResourceSchema schema, boolean[] requiredColumns) {
+ StringBuilder sb = new StringBuilder();
+ ResourceFieldSchema[] fields = schema.getFields();
+ for (int i = 0; i < requiredColumns.length; i++) {
if (requiredColumns[i]) {
- result += i;
- result += ",";
+ sb.append(fields[i]).append(",");
}
}
- if(result.endsWith(",")) {
- result = result.substring(0, result.length()-1);
+ if(sb.charAt(sb.length() - 1) == ',') {
+ sb.deleteCharAt(sb.length() - 1);
}
- return result;
+ return sb.toString();
}
@Override
@@ -342,19 +404,6 @@ public class OrcStorage extends LoadFunc
return p;
}
- private static TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
- FileSystem fs = FileSystem.get(job.getConfiguration());
- Path path = getFirstFile(location, fs);
- if (path == null) {
- log.info("Cannot find any ORC files from " + location +
- ". Probably multiple load store in script.");
- return null;
- }
- Reader reader = OrcFile.createReader(fs, path);
- ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
- return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
- }
-
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
@@ -372,11 +421,8 @@ public class OrcStorage extends LoadFunc
private TypeInfo getTypeInfo(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
- String serializedStr = p.getProperty(signature + SchemaSignatureSuffix);
- TypeInfo typeInfo = null;
- if (serializedStr != null) {
- typeInfo = (TypeInfo) ObjectSerializer.deserialize(serializedStr);
- } else {
+ TypeInfo typeInfo = (TypeInfo) ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
+ if (typeInfo == null) {
typeInfo = getTypeInfoFromLocation(location, job);
}
if (typeInfo != null) {
@@ -385,6 +431,19 @@ public class OrcStorage extends LoadFunc
return typeInfo;
}
+ private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = getFirstFile(location, fs);
+ if (path == null) {
+ log.info("Cannot find any ORC files from " + location +
+ ". Probably multiple load store in script.");
+ return null;
+ }
+ Reader reader = OrcFile.createReader(fs, path);
+ ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
+ return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
+ }
+
@Override
public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
@@ -431,4 +490,199 @@ public class OrcStorage extends LoadFunc
return new RequiredFieldResponse(true);
}
+ @Override
+ public List<String> getPredicateFields(String location, Job job) throws IOException {
+ ResourceSchema schema = getSchema(location, job);
+ List<String> predicateFields = new ArrayList<String>();
+ for (ResourceFieldSchema field : schema.getFields()) {
+ switch(field.getType()) {
+ case DataType.BOOLEAN:
+ //TODO: Need to find what to set for boolean. Throws error if SearchArgument value is set as boolean
+ break;
+ case DataType.INTEGER:
+ case DataType.LONG:
+ case DataType.FLOAT:
+ case DataType.DOUBLE:
+ case DataType.DATETIME:
+ case DataType.BYTEARRAY:
+ case DataType.CHARARRAY:
+ case DataType.BIGINTEGER:
+ case DataType.BIGDECIMAL:
+ predicateFields.add(field.getName());
+ break;
+ default:
+ // Skip DataType.TUPLE, DataType.MAP and DataType.BAG
+ break;
+ }
+ }
+ return predicateFields;
+ }
+
+ @Override
+ public List<OpType> getSupportedExpressionTypes() {
+ List<OpType> types = new ArrayList<OpType>();
+ types.add(OpType.OP_EQ);
+ types.add(OpType.OP_NE);
+ types.add(OpType.OP_GT);
+ types.add(OpType.OP_GE);
+ types.add(OpType.OP_LT);
+ types.add(OpType.OP_LE);
+ types.add(OpType.OP_IN);
+ types.add(OpType.OP_BETWEEN);
+ types.add(OpType.OP_NULL);
+ types.add(OpType.OP_NOT);
+ types.add(OpType.OP_AND);
+ types.add(OpType.OP_OR);
+ return types;
+ }
+
+ @Override
+ public void setPushdownPredicate(Expression expr) throws IOException {
+ SearchArgument sArg = getSearchArgument(expr);
+ if (sArg != null) {
+ log.info("Pushdown predicate expression is " + expr);
+ log.info("Pushdown predicate SearchArgument is:\n" + sArg);
+ Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ try {
+ p.setProperty(signature + SearchArgsSuffix, sArg.toKryo());
+ } catch (Exception e) {
+ throw new IOException("Cannot serialize SearchArgument: " + sArg);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ SearchArgument getSearchArgument(Expression expr) {
+ if (expr == null) {
+ return null;
+ }
+ Builder builder = SearchArgument.FACTORY.newBuilder();
+ boolean beginWithAnd = !(expr.getOpType().equals(OpType.OP_AND) || expr.getOpType().equals(OpType.OP_OR) || expr.getOpType().equals(OpType.OP_NOT));
+ if (beginWithAnd) {
+ builder.startAnd();
+ }
+ buildSearchArgument(expr, builder);
+ if (beginWithAnd) {
+ builder.end();
+ }
+ SearchArgument sArg = builder.build();
+ return sArg;
+ }
+
+ private void buildSearchArgument(Expression expr, Builder builder) {
+ if (expr instanceof BinaryExpression) {
+ Expression lhs = ((BinaryExpression) expr).getLhs();
+ Expression rhs = ((BinaryExpression) expr).getRhs();
+ switch (expr.getOpType()) {
+ case OP_AND:
+ builder.startAnd();
+ buildSearchArgument(lhs, builder);
+ buildSearchArgument(rhs, builder);
+ builder.end();
+ break;
+ case OP_OR:
+ builder.startOr();
+ buildSearchArgument(lhs, builder);
+ buildSearchArgument(rhs, builder);
+ builder.end();
+ break;
+ case OP_EQ:
+ builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+ break;
+ case OP_NE:
+ builder.startNot();
+ builder.equals(getColumnName(lhs), getExpressionValue(rhs));
+ builder.end();
+ break;
+ case OP_LT:
+ builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+ break;
+ case OP_LE:
+ builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+ break;
+ case OP_GT:
+ builder.startNot();
+ builder.lessThanEquals(getColumnName(lhs), getExpressionValue(rhs));
+ builder.end();
+ break;
+ case OP_GE:
+ builder.startNot();
+ builder.lessThan(getColumnName(lhs), getExpressionValue(rhs));
+ builder.end();
+ break;
+ case OP_BETWEEN:
+ BetweenExpression between = (BetweenExpression) rhs;
+ builder.between(getColumnName(lhs), getSearchArgObjValue(between.getLower()), getSearchArgObjValue(between.getUpper()));
+ case OP_IN:
+ InExpression in = (InExpression) rhs;
+ builder.in(getColumnName(lhs), getSearchArgObjValues(in.getValues()).toArray());
+ default:
+ throw new RuntimeException("Unsupported binary expression type: " + expr.getOpType() + " in " + expr);
+ }
+ } else if (expr instanceof UnaryExpression) {
+ Expression unaryExpr = ((UnaryExpression) expr).getExpression();
+ switch (expr.getOpType()) {
+ case OP_NULL:
+ builder.isNull(getColumnName(unaryExpr));
+ break;
+ case OP_NOT:
+ builder.startNot();
+ buildSearchArgument(unaryExpr, builder);
+ builder.end();
+ break;
+ default:
+ throw new RuntimeException("Unsupported unary expression type: " +
+ expr.getOpType() + " in " + expr);
+ }
+ } else {
+ throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
+ }
+ }
+
+ private String getColumnName(Expression expr) {
+ try {
+ return ((Column) expr).getName();
+ } catch (ClassCastException e) {
+ throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
+ " in expression " + expr, e);
+ }
+ }
+
+ private Object getExpressionValue(Expression expr) {
+ switch(expr.getOpType()) {
+ case TERM_COL:
+ return ((Column) expr).getName();
+ case TERM_CONST:
+ return getSearchArgObjValue(((Const) expr).getValue());
+ default:
+ throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
+ }
+ }
+
+ private List<Object> getSearchArgObjValues(List<Object> values) {
+ if (!(values.get(0) instanceof BigInteger || values.get(0) instanceof BigDecimal || values.get(0) instanceof DateTime)) {
+ return values;
+ }
+ List<Object> newValues = new ArrayList<Object>(values.size());
+ for (Object value : values) {
+ newValues.add(getSearchArgObjValue(value));
+ }
+ return values;
+ }
+
+ private Object getSearchArgObjValue(Object value) {
+ // TODO Test BigInteger, BigInteger and DateTime
+ if (value instanceof BigInteger) {
+ return HiveDecimal.create(((BigInteger)value));
+ } else if (value instanceof BigDecimal) {
+ return HiveDecimal.create(((BigDecimal)value), false);
+ } else if (value instanceof DateTime) {
+ //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz?
+ return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1)));
+ } else {
+ //TODO compare to Orc schema and change type for varchar, typecast for byte, short
+ return value;
+ }
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/FilterExtractor.java Mon Aug 4 21:35:47 2014
@@ -18,7 +18,6 @@
package org.apache.pig.newplan;
-import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -29,11 +28,13 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
import org.apache.pig.newplan.logical.expression.LessThanExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
@@ -41,29 +42,16 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.ModExpression;
import org.apache.pig.newplan.logical.expression.MultiplyExpression;
import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.OrExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.RegexExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
-/**
- * This is a rewrite of {@code PColFilterExtractor}
- *
- * We traverse the expression plan bottom up and separate it into two plans
- * - pushdownExprPlan, plan that can be pushed down to the loader and
- * - filterExprPlan, remaining plan that needs to be evaluated by pig
- *
- */
-public class FilterExtractor {
-
- private static final Log LOG = LogFactory.getLog(FilterExtractor.class);
-
- /**
- * partition columns associated with the table
- * present in the load on which the filter whose
- * inner plan is being visited is applied
- */
- private List<String> partitionCols;
+public abstract class FilterExtractor {
+
+ protected final Log LOG = LogFactory.getLog(getClass());
/**
* We visit this plan to create the filteredPlan
@@ -83,12 +71,12 @@ public class FilterExtractor {
/**
* Final filterExpr after we are done
*/
- private LogicalExpression filterExpr = null;
+ protected LogicalExpression filterExpr = null;
/**
* @{code Expression} to pushdown
*/
- private Expression pushdownExpr = null;
+ protected Expression pushdownExpr = null;
/**
*
@@ -96,10 +84,8 @@ public class FilterExtractor {
* @param partitionCols list of partition columns of the table which is
* being loaded in the LOAD statement which is input to the filter
*/
- public FilterExtractor(LogicalExpressionPlan plan,
- List<String> partitionCols) {
+ public FilterExtractor(LogicalExpressionPlan plan) {
this.originalPlan = plan;
- this.partitionCols = new ArrayList<String>(partitionCols);
this.filteredPlan = new LogicalExpressionPlan();
this.pushdownExprPlan = new LogicalExpressionPlan();
}
@@ -141,20 +127,19 @@ public class FilterExtractor {
}
/**
- * @return the condition on partition columns extracted from filter
+ * @return the push condition from the filter
*/
- public Expression getPColCondition(){
+ public Expression getPushDownExpression(){
return pushdownExpr;
}
- private class KeyState {
+ protected class KeyState {
LogicalExpression pushdownExpr;
LogicalExpression filterExpr;
}
- private KeyState checkPushDown(LogicalExpression op) throws FrontendException {
- // Note: Currently, Expression interface only understands 3 Expression Types
- // (Look at getExpression below) BinaryExpression, ProjectExpression and ConstantExpression
+ protected KeyState checkPushDown(LogicalExpression op) throws FrontendException {
+ // Note: Currently, Expression interface only understands following Expression Types
if(op instanceof ProjectExpression) {
return checkPushDown((ProjectExpression)op);
} else if (op instanceof BinaryExpression) {
@@ -165,6 +150,10 @@ public class FilterExtractor {
state.pushdownExpr = op;
state.filterExpr = null;
return state;
+ } else if(op instanceof CastExpression) {
+ return checkPushDown(((CastExpression)op).getExpression());
+ } else if (op instanceof UnaryExpression) {
+ return checkPushDown((UnaryExpression) op);
} else {
KeyState state = new KeyState();
state.pushdownExpr = null;
@@ -173,7 +162,7 @@ public class FilterExtractor {
}
}
- private LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
+ protected LogicalExpression addToFilterPlan(LogicalExpression op) throws FrontendException {
// This copies the whole tree underneath op
LogicalExpression newOp = op.deepCopy(filteredPlan);
return newOp;
@@ -213,8 +202,14 @@ public class FilterExtractor {
return orOp;
}
- private KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
+ protected KeyState checkPushDown(BinaryExpression binExpr) throws FrontendException {
KeyState state = new KeyState();
+
+ if (!isSupportedOpType(binExpr)) {
+ state.filterExpr = addToFilterPlan(binExpr);
+ state.pushdownExpr = null;
+ return state;
+ }
KeyState leftState = checkPushDown(binExpr.getLhs());
KeyState rightState = checkPushDown(binExpr.getRhs());
@@ -273,19 +268,32 @@ public class FilterExtractor {
return state;
}
- private KeyState checkPushDown(ProjectExpression project) throws FrontendException {
- String fieldName = project.getFieldSchema().alias;
+ protected KeyState checkPushDown(UnaryExpression unaryExpr) throws FrontendException {
KeyState state = new KeyState();
- if(partitionCols.contains(fieldName)) {
- state.filterExpr = null;
- state.pushdownExpr = project;
+ if (isSupportedOpType(unaryExpr)) {
+ if (unaryExpr instanceof IsNullExpression) {
+ state.pushdownExpr = unaryExpr;
+ state.filterExpr = null;
+ } else if (unaryExpr instanceof NotExpression) {
+ state.pushdownExpr = unaryExpr;
+ state.filterExpr = null;
+ } else {
+ state.filterExpr = addToFilterPlan(unaryExpr);
+ state.pushdownExpr = null;
+ }
} else {
- state.filterExpr = addToFilterPlan(project);
+ state.filterExpr = addToFilterPlan(unaryExpr);
state.pushdownExpr = null;
}
return state;
}
+ protected abstract KeyState checkPushDown(ProjectExpression project) throws FrontendException;
+
+ protected abstract boolean isSupportedOpType(BinaryExpression binOp);
+
+ protected abstract boolean isSupportedOpType(UnaryExpression unaryOp);
+
/**
* Assume that the given operator is already disconnected from its predecessors.
* @param op
@@ -311,7 +319,7 @@ public class FilterExtractor {
filteredPlan.remove( op );
}
- public static Expression getExpression(LogicalExpression op) throws FrontendException
+ public Expression getExpression(LogicalExpression op) throws FrontendException
{
if(op == null) {
return null;
@@ -323,11 +331,7 @@ public class FilterExtractor {
ProjectExpression projExpr = (ProjectExpression)op;
String fieldName = projExpr.getFieldSchema().alias;
return new Expression.Column(fieldName);
- } else {
- if( !( op instanceof BinaryExpression ) ) {
- LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
- throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
- }
+ } else if(op instanceof BinaryExpression) {
BinaryExpression binOp = (BinaryExpression)op;
if(binOp instanceof AddExpression) {
return getExpression( binOp, OpType.OP_PLUS );
@@ -358,15 +362,35 @@ public class FilterExtractor {
} else if(binOp instanceof RegexExpression) {
return getExpression(binOp, OpType.OP_MATCH);
} else {
- LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
- throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ LOG.error("Unsupported conversion of BinaryExpression to Expression: " + op.getName());
+ throw new FrontendException("Unsupported conversion of BinaryExpression to Expression: " + op.getName());
}
+ } else if(op instanceof UnaryExpression) {
+ UnaryExpression unaryOp = (UnaryExpression)op;
+ if(unaryOp instanceof IsNullExpression) {
+ return getExpression(unaryOp, OpType.OP_NULL);
+ } else if(unaryOp instanceof NotExpression) {
+ return getExpression(unaryOp, OpType.OP_NOT);
+ } else if(unaryOp instanceof CastExpression) {
+ return getExpression(unaryOp.getExpression());
+ } else {
+ LOG.error("Unsupported conversion of UnaryExpression to Expression: " + op.getName());
+ throw new FrontendException("Unsupported conversion of UnaryExpression to Expression: " + op.getName());
+ }
+ } else {
+ LOG.error("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
+ throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
}
}
- private static Expression getExpression(BinaryExpression binOp, OpType
+ protected Expression getExpression(BinaryExpression binOp, OpType
opType) throws FrontendException {
return new Expression.BinaryExpression(getExpression(binOp.getLhs())
, getExpression(binOp.getRhs()), opType);
}
+
+ protected Expression getExpression(UnaryExpression unaryOp, OpType
+ opType) throws FrontendException {
+ return new Expression.UnaryExpression(getExpression(unaryOp.getExpression()), opType);
+ }
}
Added: pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/PartitionFilterExtractor.java Mon Aug 4 21:35:47 2014
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
+
+/**
+ * This is a rewrite of {@code PColFilterExtractor}
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ */
+public class PartitionFilterExtractor extends FilterExtractor {
+
+ /**
+ * partition columns associated with the table
+ * present in the load on which the filter whose
+ * inner plan is being visited is applied
+ */
+ private List<String> partitionCols;
+
+
+ /**
+ * @param plan logical plan corresponding the filter's comparison condition
+ * @param partitionCols list of partition columns of the table which is
+ * being loaded in the LOAD statement which is input to the filter
+ */
+ public PartitionFilterExtractor(LogicalExpressionPlan plan,
+ List<String> partitionCols) {
+ super(plan);
+ this.partitionCols = new ArrayList<String>(partitionCols);
+ }
+
+ @Override
+ protected KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+ String fieldName = project.getFieldSchema().alias;
+ KeyState state = new KeyState();
+ if(partitionCols.contains(fieldName)) {
+ state.filterExpr = null;
+ state.pushdownExpr = project;
+ } else {
+ state.filterExpr = addToFilterPlan(project);
+ state.pushdownExpr = null;
+ }
+ return state;
+ }
+
+ @Override
+ protected boolean isSupportedOpType(BinaryExpression binOp) {
+ return true;
+ }
+
+ @Override
+ protected boolean isSupportedOpType(UnaryExpression unaryOp) {
+ return false;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/PredicatePushDownFilterExtractor.java Mon Aug 4 21:35:47 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.List;
+
+import org.apache.pig.Expression.OpType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UnaryExpression;
+
+/**
+ *
+ * We traverse the expression plan bottom up and separate it into two plans
+ * - pushdownExprPlan, plan that can be pushed down to the loader and
+ * - filterExprPlan, remaining plan that needs to be evaluated by pig
+ *
+ * If the predicate is not removable then filterExprPlan will not have
+ * the pushdownExprPlan removed.
+ */
+public class PredicatePushDownFilterExtractor extends FilterExtractor {
+
+ private List<String> predicateCols;
+ private List<OpType> supportedOpTypes;
+
+ public PredicatePushDownFilterExtractor(LogicalExpressionPlan plan, List<String> predicateCols,
+ List<OpType> supportedOpTypes) {
+ super(plan);
+ this.predicateCols = predicateCols;
+ this.supportedOpTypes = supportedOpTypes;
+ }
+
+ @Override
+ public void visit() throws FrontendException {
+ super.visit();
+ if (supportedOpTypes.contains(OpType.OP_BETWEEN)) {
+ // TODO: Collapse multiple ORs into BETWEEN
+ } else if (supportedOpTypes.contains(OpType.OP_IN)) {
+ // TODO: Collapse multiple ORs into IN
+ }
+ }
+
+ @Override
+ protected KeyState checkPushDown(ProjectExpression project) throws FrontendException {
+ String fieldName = project.getFieldSchema().alias;
+ KeyState state = new KeyState();
+ if(predicateCols.contains(fieldName)) {
+ state.filterExpr = null;
+ state.pushdownExpr = project;
+ } else {
+ state.filterExpr = addToFilterPlan(project);
+ state.pushdownExpr = null;
+ }
+ return state;
+ }
+
+ @Override
+ protected boolean isSupportedOpType(BinaryExpression binOp) {
+ if(binOp instanceof AddExpression) {
+ return supportedOpTypes.contains(OpType.OP_PLUS );
+ } else if(binOp instanceof SubtractExpression) {
+ return supportedOpTypes.contains(OpType.OP_MINUS);
+ } else if(binOp instanceof MultiplyExpression) {
+ return supportedOpTypes.contains(OpType.OP_TIMES);
+ } else if(binOp instanceof DivideExpression) {
+ return supportedOpTypes.contains(OpType.OP_DIV);
+ } else if(binOp instanceof ModExpression) {
+ return supportedOpTypes.contains(OpType.OP_MOD);
+ } else if(binOp instanceof AndExpression) {
+ return supportedOpTypes.contains(OpType.OP_AND);
+ } else if(binOp instanceof OrExpression) {
+ return supportedOpTypes.contains(OpType.OP_OR);
+ } else if(binOp instanceof EqualExpression) {
+ return supportedOpTypes.contains(OpType.OP_EQ);
+ } else if(binOp instanceof NotEqualExpression) {
+ return supportedOpTypes.contains(OpType.OP_NE);
+ } else if(binOp instanceof GreaterThanExpression) {
+ return supportedOpTypes.contains(OpType.OP_GT);
+ } else if(binOp instanceof GreaterThanEqualExpression) {
+ return supportedOpTypes.contains(OpType.OP_GE);
+ } else if(binOp instanceof LessThanExpression) {
+ return supportedOpTypes.contains(OpType.OP_LT);
+ } else if(binOp instanceof LessThanEqualExpression) {
+ return supportedOpTypes.contains(OpType.OP_LE);
+ } else if(binOp instanceof RegexExpression) {
+ return supportedOpTypes.contains(OpType.OP_MATCH);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ protected boolean isSupportedOpType(UnaryExpression unaryOp) {
+ if(unaryOp instanceof IsNullExpression) {
+ return supportedOpTypes.contains(OpType.OP_NULL);
+ } else if(unaryOp instanceof NotExpression) {
+ return supportedOpTypes.contains(OpType.OP_NOT);
+ } else {
+ return false;
+ }
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Mon Aug 4 21:35:47 2014
@@ -39,6 +39,7 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.rules.MergeFilter;
import org.apache.pig.newplan.logical.rules.MergeForEach;
import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
+import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer;
import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
import org.apache.pig.newplan.logical.rules.SplitFilter;
@@ -132,6 +133,15 @@ public class LogicalPlanOptimizer extend
if (!s.isEmpty())
ls.add(s);
+ // Predicate pushdown set
+ // This set of rules push filter conditions to LoadFunc
+ s = new HashSet<Rule>();
+ // Optimize partition filter
+ r = new PredicatePushdownOptimizer("PredicatePushdownOptimizer");
+ checkAndAddRule(s, r);
+ if (!s.isEmpty())
+ ls.add(s);
+
// PushDownForEachFlatten set
s = new HashSet<Rule>();
// Add the PushDownForEachFlatten
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PartitionFilterOptimizer.java Mon Aug 4 21:35:47 2014
@@ -35,7 +35,7 @@ import org.apache.pig.newplan.FilterExtr
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.PartitionFilterExtractor;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
@@ -46,9 +46,9 @@ import org.apache.pig.newplan.optimizer.
public class PartitionFilterOptimizer extends Rule {
private String[] partitionKeys;
-
+
/**
- * a reference to the LoadMetada implementation
+ * a reference to the LoadMetada implementation
*/
private LoadMetadata loadMetadata;
@@ -56,23 +56,23 @@ public class PartitionFilterOptimizer ex
* a reference to the LoadFunc implementation
*/
private LoadFunc loadFunc;
-
+
private LOLoad loLoad;
private LOFilter loFilter;
-
+
/**
- * a map between column names as reported in
+ * a map between column names as reported in
* {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
- * and as present in {@link LOLoad#getSchema()}. The two will be different
+ * and as present in {@link LOLoad#getSchema()}. The two will be different
* when the user has provided a schema in the load statement
*/
private Map<String, String> colNameMap = new HashMap<String, String>();
-
+
/**
* a map between column nameas as present in {@link LOLoad#getSchema()} and
- * as reported in
+ * as reported in
* {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
- * The two will be different when the user has provided a schema in the
+ * The two will be different when the user has provided a schema in the
* load statement.
*/
private Map<String, String> reverseColNameMap = new HashMap<String, String>();
@@ -109,17 +109,17 @@ public class PartitionFilterOptimizer ex
if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) )
return false;
loFilter = (LOFilter)succeds.get(0);
-
+
// Filter has dependency other than load, skip optimization
if (currentPlan.getSoftLinkPredecessors(loFilter)!=null)
return false;
-
+
// we have to check more only if LoadFunc implements LoadMetada
loadFunc = loLoad.getLoadFunc();
if(!( loadFunc instanceof LoadMetadata ) ) {
return false;
}
-
+
loadMetadata = (LoadMetadata)loadFunc;
try {
partitionKeys = loadMetadata.getPartitionKeys(
@@ -130,7 +130,7 @@ public class PartitionFilterOptimizer ex
if( partitionKeys == null || partitionKeys.length == 0 ) {
return false;
}
-
+
return true;
}
@@ -144,11 +144,11 @@ public class PartitionFilterOptimizer ex
subPlan = new OperatorSubPlan( currentPlan );
setupColNameMaps();
-
- FilterExtractor filterFinder = new FilterExtractor(
- loFilter.getFilterPlan(), getMappedKeys( partitionKeys ) );
+
+ FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(),
+ getMappedKeys(partitionKeys));
filterFinder.visit();
- Expression partitionFilter = filterFinder.getPColCondition();
+ Expression partitionFilter = filterFinder.getPushDownExpression();
if(partitionFilter != null) {
// the column names in the filter may be the ones provided by
@@ -168,7 +168,7 @@ public class PartitionFilterOptimizer ex
}
}
}
-
+
protected void updateMappedColNames(Expression expr) {
if(expr instanceof BinaryExpression) {
updateMappedColNames(((BinaryExpression) expr).getLhs());
@@ -180,7 +180,7 @@ public class PartitionFilterOptimizer ex
}
/**
- * The partition keys in the argument are as reported by
+ * The partition keys in the argument are as reported by
* {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
* The user may have renamed these by providing a schema with different names
* in the load statement - this method will replace the former names with
@@ -203,9 +203,9 @@ public class PartitionFilterOptimizer ex
colNameMap.put(loadFuncSchema.getField(i).alias,
(i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
loadFuncSchema.getField(i).alias));
-
+
reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
- loadFuncSchema.getField(i).alias),
+ loadFuncSchema.getField(i).alias),
loadFuncSchema.getField(i).alias);
}
}
Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java?rev=1615805&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/PredicatePushdownOptimizer.java Mon Aug 4 21:35:47 2014
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.Expression.Column;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.PredicatePushDownFilterExtractor;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class PredicatePushdownOptimizer extends Rule {
+
+ public PredicatePushdownOptimizer(String name) {
+ super(name, false);
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator load = new LOLoad(null, plan);
+ plan.add(load);
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new PredicatePushDownTransformer();
+ }
+
+ class PredicatePushDownTransformer extends Transformer {
+
+ private LOLoad loLoad;
+ private LOFilter loFilter;
+
+ private LoadFunc loadFunc;
+ private LoadPredicatePushdown loadPredPushdown;
+
+ private List<String> predicateFields;
+
+ /**
+ * a map between column names as reported in
+ * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}
+ * and as present in {@link LOLoad#getSchema()}. The two will be different
+ * when the user has provided a schema in the load statement
+ */
+ private Map<String, String> colNameMap = new HashMap<String, String>();
+
+ /**
+ * a map between column nameas as present in {@link LOLoad#getSchema()} and
+ * as reported in
+ * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}.
+ * The two will be different when the user has provided a schema in the
+ * load statement.
+ */
+ private Map<String, String> reverseColNameMap = new HashMap<String, String>();
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws FrontendException {
+ loLoad = (LOLoad)matched.getSources().get(0);
+ // Match filter.
+ List<Operator> succeds = currentPlan.getSuccessors( loLoad );
+ if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) )
+ return false;
+ loFilter = (LOFilter) succeds.get(0);
+
+ // Filter has dependency other than load, skip optimization
+ if (currentPlan.getSoftLinkPredecessors(loFilter) != null)
+ return false;
+
+ // we have to check more only if LoadFunc implements LoadPredicatePushdown
+ loadFunc = loLoad.getLoadFunc();
+ if (!(loadFunc instanceof LoadPredicatePushdown)) {
+ return false;
+ }
+
+ loadPredPushdown = (LoadPredicatePushdown) loadFunc;
+ try {
+ predicateFields = loadPredPushdown.getPredicateFields(loLoad.getFileSpec()
+ .getFileName(), new Job(loLoad.getConfiguration()));
+ } catch (IOException e) {
+ throw new FrontendException(e);
+ }
+ if (predicateFields == null || predicateFields.size() == 0) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ // Return null in case predicate pushdown is just a hint which means the plan hasn't changed.
+ // If not return the modified plan which has filters removed.
+ return null;
+ //return subPlan; TODO: implement filter removal
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+ subPlan = new OperatorSubPlan( currentPlan );
+
+ setupColNameMaps();
+
+ PredicatePushDownFilterExtractor filterFinder = new PredicatePushDownFilterExtractor(
+ loFilter.getFilterPlan(), getMappedKeys( predicateFields ), loadPredPushdown.getSupportedExpressionTypes() );
+ filterFinder.visit();
+ Expression pushDownPredicate = filterFinder.getPushDownExpression();
+
+ if(pushDownPredicate != null) {
+ // the column names in the filter may be the ones provided by
+ // the user in the schema in the load statement - we may need
+ // to replace them with partition column names as given by
+ // LoadFunc.getSchema()
+ updateMappedColNames(pushDownPredicate);
+ try {
+ loadPredPushdown.setPushdownPredicate(pushDownPredicate);
+ } catch (IOException e) {
+ throw new FrontendException( e );
+ }
+ }
+ }
+
+ protected void updateMappedColNames(Expression expr) {
+ if(expr instanceof BinaryExpression) {
+ updateMappedColNames(((BinaryExpression) expr).getLhs());
+ updateMappedColNames(((BinaryExpression) expr).getRhs());
+ } else if (expr instanceof Column) {
+ Column col = (Column) expr;
+ col.setName(reverseColNameMap.get(col.getName()));
+ }
+ }
+
+ /**
+ * The partition keys in the argument are as reported by
+ * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}.
+ * The user may have renamed these by providing a schema with different names
+ * in the load statement - this method will replace the former names with
+ * the latter names.
+ * @param predicateFields
+ * @return
+ */
+ protected List<String> getMappedKeys(List<String> predicateFields) {
+ List<String> mappedKeys = new ArrayList<String>(predicateFields.size());
+ for (int i = 0; i < predicateFields.size(); i++) {
+ mappedKeys.add(colNameMap.get(predicateFields.get(i)));
+ }
+ return mappedKeys;
+ }
+
+ protected void setupColNameMaps() throws FrontendException {
+ LogicalSchema loLoadSchema = loLoad.getSchema();
+ LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema();
+ for(int i = 0; i < loadFuncSchema.size(); i++) {
+ colNameMap.put(loadFuncSchema.getField(i).alias,
+ (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+ loadFuncSchema.getField(i).alias));
+
+ reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias :
+ loadFuncSchema.getField(i).alias),
+ loadFuncSchema.getField(i).alias);
+ }
+ }
+
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java Mon Aug 4 21:35:47 2014
@@ -33,30 +33,30 @@ import org.apache.pig.newplan.OperatorPl
* transform or until maxIter iterations (default 500) has been made over
* the RuleSet. Then the next RuleSet will be moved to. Once finished,
* a given RuleSet is never returned to.
- *
+ *
* Each rule is has two parts: a pattern and and associated transformer.
* Transformers have two important functions: check(), and transform().
* The pattern describes a pattern of node types that the optimizer will
* look to match. If that match is found anywhere in the plan, then check()
- * will be called. check() allows the rule to look more in depth at the
+ * will be called. check() allows the rule to look more in depth at the
* matched pattern and decide whether the rule should be run or not. For
* example, one might design a rule to push filters above join that would
* look for the pattern filter(join) (meaning a filter followed by a join).
- * But only certain types of filters can be pushed. The check() function
+ * But only certain types of filters can be pushed. The check() function
* would need to decide whether the filter that it found was pushable or not.
* If check() returns true, the rule is said to have matched, and transform()
* is then called. This function is responsible for making changes in the
* logical plan. Once transform is complete PlanPatcher.patchUp will be
- * called to do any necessary cleanup in the plan, such as resetting
+ * called to do any necessary cleanup in the plan, such as resetting
* schemas, etc.
*/
public abstract class PlanOptimizer {
-
+
protected List<Set<Rule>> ruleSets;
protected OperatorPlan plan;
protected List<PlanTransformListener> listeners;
protected int maxIter;
-
+
static final int defaultIterations = 500;
/**
@@ -66,16 +66,16 @@ public abstract class PlanOptimizer {
* set to -1 for default
*/
protected PlanOptimizer(OperatorPlan p,
- List<Set<Rule>> rs,
+ List<Set<Rule>> rs,
int iterations) {
plan = p;
ruleSets = rs;
listeners = new ArrayList<PlanTransformListener>();
maxIter = (iterations < 1 ? defaultIterations : iterations);
}
-
+
/**
- * Adds a listener to the optimization. This listener will be fired
+ * Adds a listener to the optimization. This listener will be fired
* after each rule transforms a plan. Listeners are guaranteed to
* be fired in the order they are added.
* @param listener
@@ -83,13 +83,13 @@ public abstract class PlanOptimizer {
protected void addPlanTransformListener(PlanTransformListener listener) {
listeners.add(listener);
}
-
+
/**
* Run the optimizer. This method attempts to match each of the Rules
* against the plan. If a Rule matches, it then calls the check
* method of the associated Transformer to give the it a chance to
* check whether it really wants to do the optimization. If that
- * returns true as well, then Transformer.transform is called.
+ * returns true as well, then Transformer.transform is called.
* @throws FrontendException
*/
public void optimize() throws FrontendException {
@@ -108,9 +108,12 @@ public abstract class PlanOptimizer {
if (transformer.check(m)) {
sawMatch = true;
transformer.transform(m);
- if (!rule.isSkipListener()) {
+ OperatorPlan change = transformer.reportChanges();
+ if (change == null) {
+ sawMatch = false;
+ } else if (!rule.isSkipListener()) {
for(PlanTransformListener l: listeners) {
- l.transformed(plan, transformer.reportChanges());
+ l.transformed(plan, change);
}
}
}
Modified: pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java?rev=1615805&r1=1615804&r2=1615805&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestOrcStorage.java Mon Aug 4 21:35:47 2014
@@ -65,6 +65,7 @@ import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestOrcStorage {
@@ -82,13 +83,8 @@ public class TestOrcStorage {
private static PigServer pigServer = null;
private static FileSystem fs;
- @Before
- public void setup() throws ExecException, IOException {
- pigServer = new PigServer(ExecType.LOCAL);
- fs = FileSystem.get(ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties()));
- deleteTestFiles();
- pigServer.mkdirs(outbasedir);
- generateInputFiles();
+ @BeforeClass
+ public static void oneTimeSetup(){
if(Util.WINDOWS){
INPUT1 = INPUT1.replace("\\", "/");
OUTPUT1 = OUTPUT1.replace("\\", "/");
@@ -98,6 +94,15 @@ public class TestOrcStorage {
}
}
+ @Before
+ public void setup() throws ExecException, IOException {
+ pigServer = new PigServer(ExecType.LOCAL);
+ fs = FileSystem.get(ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties()));
+ deleteTestFiles();
+ pigServer.mkdirs(outbasedir);
+ generateInputFiles();
+ }
+
@After
public void teardown() throws IOException {
if(pigServer != null) {
@@ -234,7 +239,6 @@ public class TestOrcStorage {
fs.delete(new Path(OUTPUT4, "_SUCCESS"), true);
pigServer.registerQuery("A = load '" + OUTPUT4 + "' using OrcStorage();" );
- Schema s = pigServer.dumpSchema("A");
Iterator<Tuple> iter = pigServer.openIterator("A");
Tuple t = iter.next();
assertTrue(t.toString().startsWith("(false,1,1024,65536,9223372036854775807,1.0,-15.0," +
@@ -313,6 +317,7 @@ public class TestOrcStorage {
}
+ @SuppressWarnings("rawtypes")
private void compareData(Object expected, Object actual) {
if (expected instanceof Text) {
assertEquals(String.class, actual.getClass());