You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/10/05 23:18:25 UTC
svn commit: r1394816 [1/2] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/par...
Author: jcoveney
Date: Fri Oct 5 21:18:23 2012
New Revision: 1394816
URL: http://svn.apache.org/viewvc?rev=1394816&view=rev
Log:
PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/src/org/apache/pig/data/BinInterSedes.java
pig/trunk/src/org/apache/pig/data/SchemaTuple.java
pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
pig/trunk/test/org/apache/pig/test/TestBuiltin.java
pig/trunk/test/org/apache/pig/test/TestCommit.java
pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
pig/trunk/test/org/apache/pig/test/TestLoad.java
pig/trunk/test/org/apache/pig/test/TestPigServer.java
pig/trunk/test/org/apache/pig/test/TestPigServerWithMacros.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 5 21:18:23 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)
+
PIG-2923: Lazily register bags with SpillableMemoryManager (dvryaboy)
PIG-2929: Improve documentation around AVG, CONCAT, MIN, MAX (cheolsoo via billgraham)
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Fri Oct 5 21:18:23 2012
@@ -19,9 +19,15 @@
package org.apache.pig;
/**
- * Container for static configuration strings, defaults, etc.
+ * Container for static configuration strings, defaults, etc. This is intended just for keys that can
+ * be set by users, not for keys that are generally used within pig.
*/
public class PigConfiguration {
+ private PigConfiguration() {}
+
+ /////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////// COMMAND LINE KEYS /////////////////////////////
+ /////////////////////////////////////////////////////////////////////////////////////
/**
* Controls the fraction of total memory that is allowed to be used by
@@ -48,6 +54,23 @@ public class PigConfiguration {
public static final String TIME_UDFS_PROP = "pig.udf.profile";
/**
+ * This key must be set to true by the user for code generation to be used.
+ * In the future, it may be turned on by default (at least in certain cases),
+ * but for now it is too experimental.
+ */
+ public static final String SHOULD_USE_SCHEMA_TUPLE = "pig.schematuple";
+
+ public static final String SCHEMA_TUPLE_SHOULD_USE_IN_UDF = "pig.schematuple.udf";
+
+ public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH = "pig.schematuple.foreach";
+
+ public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN = "pig.schematuple.fr_join";
+
+ public static final String SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN = "pig.schematuple.merge_join";
+
+ public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = "pig.schematuple.force";
+
+ /*
* Turns off use of combiners in MapReduce jobs produced by Pig.
*/
public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Oct 5 21:18:23 2012
@@ -280,7 +280,7 @@ public class HExecutionEngine {
SortInfoSetter sortInfoSetter = new SortInfoSetter( plan );
sortInfoSetter.visit();
- if (pigContext.inExplain==false) {
+ if (!pigContext.inExplain) {
// Validate input/output file. Currently no validation framework in
// new logical plan, put this validator here first.
// We might decide to move it out to a validator framework in future
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Oct 5 21:18:23 2012
@@ -91,7 +91,7 @@ public class HJob implements ExecJob {
p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
ConfigurationUtil.toConfiguration(
- pigContext.getProperties()), outFileSpec.getFileName(), 0);
+ pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext);
}catch (Exception e){
int errCode = 2088;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 5 21:18:23 2012
@@ -25,7 +25,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -35,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -70,13 +68,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
@@ -90,11 +86,11 @@ import org.apache.pig.impl.io.FileLocali
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableBooleanWritable;
import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDateTimeWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableDateTimeWritable;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
@@ -446,7 +442,6 @@ public class JobControlCompiler{
ss.addSettingsToConf(mro, conf);
}
-
conf.set("mapred.mapper.new-api", "true");
conf.set("mapred.reducer.new-api", "true");
@@ -655,7 +650,7 @@ public class JobControlCompiler{
setupDistributedCacheForJoin(mro, pigContext, conf);
// Search to see if we have any UDFs that need to pack things into the
- // distrubted cache.
+ // distributed cache.
setupDistributedCacheForUdfs(mro, pigContext, conf);
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Oct 5 21:18:23 2012
@@ -172,7 +172,7 @@ public abstract class PigGenericMapBase
pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(job, pigContext.getExecType());
+ SchemaTupleBackend.initialize(job, pigContext);
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Oct 5 21:18:23 2012
@@ -316,7 +316,7 @@ public class PigGenericMapReduce {
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
// This attempts to fetch all of the generated code from the distributed cache, and resolve it
- SchemaTupleBackend.initialize(jConf, pigContext.getExecType());
+ SchemaTupleBackend.initialize(jConf, pigContext);
if (rp == null)
rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Oct 5 21:18:23 2012
@@ -43,11 +43,11 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.NullableBooleanWritable;
import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDateTimeWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.io.NullableLongWritable;
-import org.apache.pig.impl.io.NullableDateTimeWritable;
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
@@ -107,8 +107,6 @@ public class WeightedRangePartitioner ex
}
try{
-
-
// use local file system to get the quantilesFile
Configuration conf;
if (pigContext.getExecType()==ExecType.MAPREDUCE) {
@@ -116,24 +114,25 @@ public class WeightedRangePartitioner ex
} else {
conf = new Configuration(false);
}
- if (configuration.get("fs.file.impl")!=null)
+ if (configuration.get("fs.file.impl") != null) {
conf.set("fs.file.impl", configuration.get("fs.file.impl"));
- if (configuration.get("fs.hdfs.impl")!=null)
+ }
+ if (configuration.get("fs.hdfs.impl") != null) {
conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
- if (configuration.getBoolean("pig.tmpfilecompression", false))
- {
+ }
+ if (configuration.getBoolean("pig.tmpfilecompression", false)) {
conf.setBoolean("pig.tmpfilecompression", true);
- if (configuration.get("pig.tmpfilecompression.codec")!=null)
+ if (configuration.get("pig.tmpfilecompression.codec") != null) {
conf.set("pig.tmpfilecompression.codec", configuration.get("pig.tmpfilecompression.codec"));
}
+ }
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
ReadToEndLoader loader = new ReadToEndLoader(Utils.getTmpFileStorageObject(conf),
conf, quantilesFile, 0);
DataBag quantilesList;
Tuple t = loader.getNext();
- if(t!=null)
- {
+ if (t != null) {
// the Quantiles file has a tuple as under:
// (numQuantiles, bag of samples)
// numQuantiles here is the reduce parallelism
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct 5 21:18:23 2012
@@ -292,7 +292,6 @@ public abstract class PhysicalOperator e
* @throws ExecException
*/
public Result processInput() throws ExecException {
-
Result res = new Result();
if (input == null && (inputs == null || inputs.size()==0)) {
// log.warn("No inputs found. Signaling End of Processing.");
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Oct 5 21:18:23 2012
@@ -36,8 +36,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -48,15 +52,10 @@ import org.apache.pig.pen.util.LineageTr
//We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
public class POForEach extends PhysicalOperator {
-
- /**
- *
- */
private static final long serialVersionUID = 1L;
protected List<PhysicalPlan> inputPlans;
protected List<PhysicalOperator> opsToBeReset;
- protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
//Since the plan has a generate, this needs to be maintained
//as the generate can potentially return multiple tuples for
//same call.
@@ -93,14 +92,12 @@ public class POForEach extends PhysicalO
protected Tuple inpTuple;
+ private Schema schema;
+
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
- public POForEach(OperatorKey k, int rp, List inp) {
- this(k,rp,inp,null);
- }
-
public POForEach(OperatorKey k, int rp) {
this(k,rp,null,null);
}
@@ -117,6 +114,12 @@ public class POForEach extends PhysicalO
getLeaves();
}
+ public POForEach(OperatorKey operatorKey, int requestedParallelism,
+ List<PhysicalPlan> innerPlans, List<Boolean> flattenList, Schema schema) {
+ this(operatorKey, requestedParallelism, innerPlans, flattenList);
+ this.schema = schema;
+ }
+
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitPOForEach(this);
@@ -301,6 +304,8 @@ public class POForEach extends PhysicalO
}
private boolean isEarlyTerminated = false;
+ private TupleMaker<? extends Tuple> tupleMaker;
+ private boolean knownSize = false;
private boolean isEarlyTerminated() {
return isEarlyTerminated;
@@ -311,6 +316,19 @@ public class POForEach extends PhysicalO
}
protected Result processPlan() throws ExecException{
+ if (schema != null && tupleMaker == null) {
+ // Note here that if SchemaTuple is currently turned on, then any UDF's in the chain
+ // must follow good practices. Namely, they should not append to the Tuple that comes
+ // out of an iterator (a practice which is fairly common, but is not recommended).
+ tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH);
+ if (tupleMaker != null) {
+ knownSize = true;
+ }
+ }
+ if (tupleMaker == null) {
+ tupleMaker = TupleFactory.getInstance();
+ }
+
Result res = new Result();
//We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
@@ -471,7 +489,9 @@ public class POForEach extends PhysicalO
* @return the final flattened tuple
*/
protected Tuple createTuple(Object[] data) throws ExecException {
- Tuple out = mTupleFactory.newTuple();
+ Tuple out = tupleMaker.newTuple();
+
+ int idx = 0;
for(int i = 0; i < data.length; ++i) {
Object in = data[i];
@@ -479,12 +499,20 @@ public class POForEach extends PhysicalO
Tuple t = (Tuple)in;
int size = t.size();
for(int j = 0; j < size; ++j) {
+ if (knownSize) {
+ out.set(idx++, t.get(j));
+ } else {
out.append(t.get(j));
}
+ }
+ } else {
+ if (knownSize) {
+ out.set(idx++, in);
} else {
out.append(in);
}
}
+ }
if (inpTuple != null) {
return illustratorMarkup(inpTuple, out, 0);
} else {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Oct 5 21:18:23 2012
@@ -21,32 +21,28 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
-import org.apache.pig.SortInfo;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.LineageTracer;
/**
* This implementation is applicable for both the physical plan and for the
@@ -304,19 +300,16 @@ public class POSort extends PhysicalOper
@Override
public boolean supportsMultipleInputs() {
-
return false;
}
@Override
public boolean supportsMultipleOutputs() {
-
return false;
}
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
-
v.visitSort(this);
}
Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Fri Oct 5 21:18:23 2012
@@ -28,9 +28,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +42,8 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.utils.SedesHelper;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
/**
* A class to handle reading and writing of intermediate results of data types. The serialization format used by this
@@ -168,7 +167,6 @@ public class BinInterSedes implements In
}
public int getTupleSize(DataInput in, byte type) throws IOException {
-
int sz;
switch (type) {
case TUPLE_0:
@@ -542,8 +540,7 @@ public class BinInterSedes implements In
}
case DataType.CHARARRAY: {
- String s = (String) val;
- SedesHelper.writeChararray(out, s);
+ SedesHelper.writeChararray(out, (String) val);
break;
}
case DataType.GENERIC_WRITABLECOMPARABLE:
Modified: pig/trunk/src/org/apache/pig/data/SchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTuple.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTuple.java Fri Oct 5 21:18:23 2012
@@ -34,9 +34,8 @@ import org.apache.pig.classification.Int
import org.apache.pig.data.utils.MethodHelper;
import org.apache.pig.data.utils.MethodHelper.NotImplemented;
import org.apache.pig.data.utils.SedesHelper;
-import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.mortbay.log.Log;
import com.google.common.collect.Lists;
@@ -195,12 +194,12 @@ public abstract class SchemaTuple<T exte
}
protected static DataBag read(DataInput in, DataBag v) throws IOException {
- return (DataBag) bis.readDatum(in, DataType.BAG);
+ return (DataBag) bis.readDatum(in);
}
@SuppressWarnings("unchecked")
protected static Map<String, Object> read(DataInput in, Map<String, Object> v) throws IOException {
- return (Map<String, Object>) bis.readDatum(in, DataType.MAP);
+ return (Map<String, Object>) bis.readDatum(in);
}
protected static int read(DataInput in, int v) throws IOException {
@@ -355,11 +354,11 @@ public abstract class SchemaTuple<T exte
}
protected DataBag unbox(Object v, DataBag t) {
- return unbox((DataBag) t);
+ return unbox((DataBag) v);
}
protected Map<String, Object> unbox(Object v, Map<String, Object> t) {
- return unbox((Map<String, Object>) t);
+ return unbox((Map<String, Object>) v);
}
protected byte[] unbox(Object v, byte[] t) {
@@ -839,10 +838,9 @@ public abstract class SchemaTuple<T exte
Log.warn("No Schema present in SchemaTuple generated class");
return new Schema();
}
- s = new String(Base64.decodeBase64(s));
- return Utils.getSchemaFromString(s);
- } catch (FrontendException e) {
- throw new RuntimeException("Unable to make Schema for String: " + s);
+ return (Schema) ObjectSerializer.deserialize(s);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize serialized Schema: " + s, e);
}
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Fri Oct 5 21:18:23 2012
@@ -17,6 +17,9 @@
*/
package org.apache.pig.data;
+import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -31,9 +34,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConstants;
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
import org.apache.pig.data.utils.StructuresHelper.Triple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.google.common.collect.Maps;
@@ -61,19 +66,6 @@ public class SchemaTupleBackend {
private boolean abort = false;
/**
- * This key must be set to true by the user for code generation to be used.
- * In the future, it may be turned on by default (at least in certain cases),
- * but for now it is too experimental.
- */
- public static final String SHOULD_GENERATE_KEY = "pig.schematuple";
-
- /**
- * This key is used in the job conf to let the various jobs know what code was
- * generated.
- */
- public static final String GENERATED_CLASSES_KEY = "pig.schematuple.classes";
-
- /**
* The only information this class needs is a directory of generated code to resolve
* classes in.
* @param jConf
@@ -81,13 +73,13 @@ public class SchemaTupleBackend {
*/
private SchemaTupleBackend(Configuration jConf, boolean isLocal) {
if (isLocal) {
- String localCodeDir = jConf.get(SchemaTupleFrontend.LOCAL_CODE_DIR);
+ String localCodeDir = jConf.get(PigConstants.LOCAL_CODE_DIR);
if (localCodeDir == null) {
LOG.debug("No local code dir set in local mode. Aborting code gen resolution.");
abort = true;
return;
}
- codeDir = new File(jConf.get(SchemaTupleFrontend.LOCAL_CODE_DIR));
+ codeDir = new File(jConf.get(PigConstants.LOCAL_CODE_DIR));
} else {
codeDir = Files.createTempDir();
codeDir.deleteOnExit();
@@ -126,7 +118,7 @@ public class SchemaTupleBackend {
private SchemaTupleFactory internalNewSchemaTupleFactory(int id) {
SchemaTupleFactory stf = schemaTupleFactoriesById.get(id);
if (stf == null) {
- LOG.warn("No SchemaTupleFactory present for given identifier: " + id);
+ LOG.debug("No SchemaTupleFactory present for given identifier: " + id);
}
return stf;
}
@@ -141,7 +133,7 @@ public class SchemaTupleBackend {
private SchemaTupleFactory newSchemaTupleFactory(Triple<SchemaKey, Boolean, GenContext> trip) {
SchemaTupleFactory stf = schemaTupleFactoriesByTriple.get(trip);
if (stf == null) {
- SchemaTupleFactory.LOG.warn("No SchemaTupleFactory present for given SchemaKey/Boolean/Context combination " + trip);
+ LOG.debug("No SchemaTupleFactory present for given SchemaKey/Boolean/Context combination " + trip);
}
return stf;
}
@@ -159,9 +151,8 @@ public class SchemaTupleBackend {
return;
}
// Step one is to see if there are any classes in the distributed cache
- String shouldGenerate = jConf.get(SchemaTupleBackend.SHOULD_GENERATE_KEY);
- if (shouldGenerate == null || !Boolean.parseBoolean(shouldGenerate)) {
- LOG.info("Key [" + SchemaTupleBackend.SHOULD_GENERATE_KEY +"] was not set... aborting generation");
+ if (!jConf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+ LOG.info("Key [" + SHOULD_USE_SCHEMA_TUPLE +"] was not set... will not generate code.");
return;
}
// Step two is to copy everything from the distributed cache if we are in distributed mode
@@ -183,12 +174,12 @@ public class SchemaTupleBackend {
}
private void copyAllFromDistributedCache() throws IOException {
- String toDeserialize = jConf.get(GENERATED_CLASSES_KEY);
+ String toDeserialize = jConf.get(PigConstants.GENERATED_CLASSES_KEY);
if (toDeserialize == null) {
- LOG.info("No classes in in key [" + GENERATED_CLASSES_KEY + "] to copy from distributed cache.");
+ LOG.info("No classes in in key [" + PigConstants.GENERATED_CLASSES_KEY + "] to copy from distributed cache.");
return;
}
- LOG.info("Copying files in key ["+GENERATED_CLASSES_KEY+"] from distributed cache: " + toDeserialize);
+ LOG.info("Copying files in key ["+PigConstants.GENERATED_CLASSES_KEY+"] from distributed cache: " + toDeserialize);
for (String s : toDeserialize.split(",")) {
LOG.info("Attempting to read file: " + s);
// The string is the symlink into the distributed cache
@@ -273,22 +264,36 @@ public class SchemaTupleBackend {
private static SchemaTupleBackend stb;
- public static void initialize(Configuration jConf, ExecType type) throws IOException {
- stb = new SchemaTupleBackend(jConf, type == ExecType.LOCAL);
+ public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
+ initialize(jConf, pigContext, pigContext.getExecType() == ExecType.LOCAL);
+ }
+
+ public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
+ if (stb != null) {
+ LOG.warn("SchemaTupleBackend has already been initialized");
+ } else {
+ SchemaTupleFrontend.lazyReset(pigContext);
+ SchemaTupleFrontend.reset();
+ stb = new SchemaTupleBackend(jConf, isLocal);
stb.copyAndResolve();
}
+ }
public static SchemaTupleFactory newSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context) {
if (stb == null) {
// It is possible (though ideally should be avoided) for this to be called on the frontend if
// the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes)
- LOG.warn("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
- return null;
+ throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
}
return stb.internalNewSchemaTupleFactory(s, isAppendable, context);
}
protected static SchemaTupleFactory newSchemaTupleFactory(int id) {
+ if (stb == null) {
+ // It is possible (though ideally should be avoided) for this to be called on the frontend if
+ // the Tuple processing path of the POPlan is invoked (perhaps for optimization purposes)
+ throw new RuntimeException("initialize was not called! Even when SchemaTuple feature is not set, it should be called.");
+ }
return stb.internalNewSchemaTupleFactory(id);
}
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Fri Oct 5 21:18:23 2012
@@ -18,6 +18,7 @@
package org.apache.pig.data;
import java.io.File;
+import java.io.IOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -25,14 +26,15 @@ import java.lang.annotation.Target;
import java.util.List;
import java.util.Queue;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.JavaCompilerHelper;
+import org.apache.pig.impl.util.ObjectSerializer;
import com.google.common.collect.Lists;
@@ -59,28 +61,27 @@ public class SchemaTupleClassGenerator {
* This context is used in UDF code. Currently, this is only used for
* the inputs to UDF's.
*/
- UDF ("pig.schematuple.udf", true, GenerateUdf.class),
+ UDF (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_UDF, true, GenerateUdf.class),
/**
- * This context is for LoadFuncs. It is currently not used,
- * however the intent is that when a Schema is known, the
- * LoadFunc can return typed Tuples.
+ * This context is for POForEach. This will use the expected output of a ForEach
+ * to return a typed Tuple.
*/
- LOAD ("pig.schematuple.load", true, GenerateLoad.class),
+ FOREACH (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH, true, GenerateForeach.class),
/**
* This context controls whether or not SchemaTuples will be used in FR joins.
* Currently, they will be used in the HashMap that FR Joins construct.
*/
- FR_JOIN ("pig.schematuple.fr_join", true, GenerateFrJoin.class),
+ FR_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN, true, GenerateFrJoin.class),
/**
* This context controls whether or not SchemaTuples will be used in merge joins.
*/
- MERGE_JOIN ("pig.schematuple.merge_join", true, GenerateMergeJoin.class),
+ MERGE_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
/**
* All registered Schemas will also be registered in one additional context.
* This context will allow users to "force" the load of a SchemaTupleFactory
* if one is present in any context.
*/
- FORCE_LOAD ("pig.schematuple.force", true, GenerateForceLoad.class);
+ FORCE_LOAD (PigConfiguration.SCHEMA_TUPLE_SHOULD_ALLOW_FORCE, true, GenerateForceLoad.class);
/**
* These annotations are used to mark a given SchemaTuple with
@@ -93,7 +94,7 @@ public class SchemaTupleClassGenerator {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
- public @interface GenerateLoad {}
+ public @interface GenerateForeach {}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@@ -158,6 +159,10 @@ public class SchemaTupleClassGenerator {
*/
private static int nextGlobalClassIdentifier = 0;
+ protected static void resetGlobalClassIdentifier() {
+ nextGlobalClassIdentifier = 0;
+ }
+
/**
* This class actually generates the code for a given Schema.
* @param schema
@@ -319,9 +324,12 @@ public class SchemaTupleClassGenerator {
private File codeDir;
public void prepare() {
- String s = schema.toString();
- s = s.substring(1, s.length() - 1);
- s = Base64.encodeBase64URLSafeString(s.getBytes());
+ String s;
+ try {
+ s = ObjectSerializer.serialize(schema);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to serialize schema: " + schema, e);
+ }
add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java Fri Oct 5 21:18:23 2012
@@ -52,7 +52,7 @@ public class SchemaTupleFactory implemen
* @return true if it is generatable
*/
public static boolean isGeneratable(Schema s) {
- if (s == null) {
+ if (s == null || s.size() == 0) {
return false;
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java Fri Oct 5 21:18:23 2012
@@ -17,11 +17,17 @@
*/
package org.apache.pig.data;
+import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY;
+import static org.apache.pig.PigConstants.LOCAL_CODE_DIR;
+import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -45,7 +51,6 @@ import com.google.common.io.Files;
/**
* This class is to be used at job creation time. It provides the API that lets code
* register Schemas with pig to be generated. It is necessary to register these Schemas
- * so that the generated code can be made on the client side, and shipped to the mappers
* and reducers.
*/
public class SchemaTupleFrontend {
@@ -110,10 +115,12 @@ public class SchemaTupleFrontend {
String codePath = codeDir.getAbsolutePath();
LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
+ LOCAL_CODE_DIR + "] with code temp directory: " + codePath);
- if (pigContext.getExecType() == ExecType.LOCAL) {
conf.set(LOCAL_CODE_DIR, codePath);
- }
return;
+ } else {
+ // This let's us avoid NPE in some of the non-traditional pipelines
+ String codePath = codeDir.getAbsolutePath();
+ conf.set(LOCAL_CODE_DIR, codePath);
}
DistributedCache.createSymlink(conf); // we will read using symlinks
StringBuilder serialized = new StringBuilder();
@@ -157,9 +164,9 @@ public class SchemaTupleFrontend {
LOG.info("File successfully added to the distributed cache: " + symlink);
}
String toSer = serialized.toString();
- LOG.info("Setting key [" + SchemaTupleBackend.GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
+ LOG.info("Setting key [" + GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
// we must set a key in the job conf so individual jobs know to resolve the shipped classes
- conf.set(SchemaTupleBackend.GENERATED_CLASSES_KEY, toSer);
+ conf.set(GENERATED_CLASSES_KEY, toSer);
}
/**
@@ -169,9 +176,8 @@ public class SchemaTupleFrontend {
*/
private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
boolean filesToShip = false;
- String shouldString = conf.get(SchemaTupleBackend.SHOULD_GENERATE_KEY);
- if (shouldString == null || !Boolean.parseBoolean(shouldString)) {
- LOG.info("Key ["+SchemaTupleBackend.SHOULD_GENERATE_KEY+"] is false, aborting generation.");
+ if (!conf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+ LOG.info("Key ["+SHOULD_USE_SCHEMA_TUPLE+"] is false, will not generate code.");
return false;
}
LOG.info("Generating all registered Schemas.");
@@ -222,6 +228,14 @@ public class SchemaTupleFrontend {
*/
public static int registerToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext context) {
if (stf == null) {
+ if (pigContextToReset != null) {
+ Properties prop = pigContextToReset.getProperties();
+ prop.remove(GENERATED_CLASSES_KEY);
+ prop.remove(LOCAL_CODE_DIR);
+ pigContextToReset = null;
+ }
+ SchemaTupleBackend.reset();
+ SchemaTupleClassGenerator.resetGlobalClassIdentifier();
stf = new SchemaTupleFrontend();
}
@@ -249,12 +263,6 @@ public class SchemaTupleFrontend {
}
/**
- * This key is used when a job is run in local mode to pass the location of the generated code
- * from the frontent to the "backend."
- */
- protected static final String LOCAL_CODE_DIR = "pig.schematuple.local.dir";
-
- /**
* This must be called when the code has been generated and the generated code needs to be shipped
* to the cluster, so that it may be used by the mappers and reducers.
* @param pigContext
@@ -268,5 +276,32 @@ public class SchemaTupleFrontend {
SchemaTupleFrontendGenHelper stfgh = new SchemaTupleFrontendGenHelper(pigContext, conf);
stfgh.generateAll(stf.getSchemasToGenerate());
stfgh.internalCopyAllGeneratedToDistributedCache();
+
+ Properties prop = pigContext.getProperties();
+ String value = conf.get(GENERATED_CLASSES_KEY);
+ if (value != null) {
+ prop.setProperty(GENERATED_CLASSES_KEY, value);
+ } else {
+ prop.remove(GENERATED_CLASSES_KEY);
+ }
+ value = conf.get(LOCAL_CODE_DIR);
+ if (value != null) {
+ prop.setProperty(LOCAL_CODE_DIR, value);
+ } else {
+ prop.remove(LOCAL_CODE_DIR);
+ }
+ }
+
+ private static PigContext pigContextToReset = null;
+
+ /**
+ * This is a method which caches a PigContext object that has had
+ * relevant key values set by SchemaTupleBackend. This is necessary
+ * because in some cases, multiple cycles of jobs might run in the JVM,
+ * but the PigContext object may be shared, so we want to make sure to
+ * undo any changes we have made to it.
+ */
+ protected static void lazyReset(PigContext pigContext) {
+ pigContextToReset = pigContext;
}
}
Modified: pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Oct 5 21:18:23 2012
@@ -253,7 +253,7 @@ public class FindQuantiles extends EvalF
probVec.set(l, new Float(0.0));
}
// for each partition that this sample item is present in,
- // compute the fraction of the total occurences for that
+ // compute the fraction of the total occurrences for that
// partition - this will be the probability with which we
// will pick this partition in the final sort reduce job
// for this sample item
Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Oct 5 21:18:23 2012
@@ -38,7 +38,9 @@ import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
/**
@@ -103,6 +105,8 @@ public class ReadToEndLoader extends Loa
*/
private InputFormat inputFormat = null;
+ private PigContext pigContext;
+
/**
* @param wrappedLoadFunc
* @param conf
@@ -120,6 +124,16 @@ public class ReadToEndLoader extends Loa
init();
}
+ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+ String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
+ this.wrappedLoadFunc = wrappedLoadFunc;
+ this.inputLocation = inputLocation;
+ this.conf = conf;
+ this.curSplitIndex = splitIndex;
+ this.pigContext = pigContext;
+ init();
+ }
+
/**
* This constructor takes an array of split indexes (toReadSplitIdxs) of the
* splits to be read.
@@ -143,9 +157,14 @@ public class ReadToEndLoader extends Loa
@SuppressWarnings("unchecked")
private void init() throws IOException {
+ if (conf != null && pigContext != null) {
+ SchemaTupleBackend.initialize(conf, pigContext, true);
+ }
+
// make a copy so that if the underlying InputFormat writes to the
// conf, we don't affect the caller's copy
conf = new Configuration(conf);
+
// let's initialize the wrappedLoadFunc
Job job = new Job(conf);
wrappedLoadFunc.setLocation(inputLocation,
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Oct 5 21:18:23 2012
@@ -131,7 +131,6 @@ public class LogToPhyTranslationVisitor
@Override
public void visit(LOLoad loLoad) throws FrontendException {
String scope = DEFAULT_SCOPE;
- // System.err.println("Entering Load");
// The last parameter here is set to true as we assume all files are
// splittable due to LoadStore Refactor
POLoad load = new POLoad(new OperatorKey(scope, nodeGen
@@ -160,7 +159,6 @@ public class LogToPhyTranslationVisitor
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
- // System.err.println("Exiting Load");
}
@Override
@@ -855,8 +853,20 @@ public class LogToPhyTranslationVisitor
for(boolean fl: flatten) {
flattenList.add(fl);
}
+ LogicalSchema logSchema = foreach.getSchema();
+ Schema schema = null;
+ if (logSchema != null) {
+ try {
+ schema = Schema.getPigSchema(new ResourceSchema(logSchema));
+ } catch (FrontendException e) {
+ throw new RuntimeException("LogicalSchema in foreach unable to be converted to Schema: " + logSchema, e);
+ }
+ }
+ if (schema != null) {
+ SchemaTupleFrontend.registerToGenerateIfPossible(schema, false, GenContext.FOREACH); //TODO may need to be appendable
+ }
POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList);
+ .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList, schema);
poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
poFE.setResultType(DataType.BAG);
logToPhyMap.put(foreach, poFE);
@@ -884,6 +894,7 @@ public class LogToPhyTranslationVisitor
/**
* This function takes in a List of LogicalExpressionPlan and converts them to
* a list of PhysicalPlans
+ *
* @param plans
* @return
* @throws FrontendException
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Oct 5 21:18:23 2012
@@ -820,7 +820,6 @@ public class LogicalPlanBuilder {
funcSpec == null ?
new FuncSpec(PigStorage.class.getName()) :
funcSpec;
-
loFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(instantiatedFuncSpec);
String fileNameKey = QueryParserUtils.constructFileNameSignature(filename, instantiatedFuncSpec) + "_" + (loadIndex++);
absolutePath = fileNameMap.get(fileNameKey);
Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Fri Oct 5 21:18:23 2012
@@ -41,14 +41,13 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
-import org.joda.time.DateTime;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -62,6 +61,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
@@ -75,13 +75,11 @@ public class TestSchemaTuple {
@Before
public void perTestInitialize() {
- SchemaTupleFrontend.reset();
- SchemaTupleBackend.reset();
-
props = new Properties();
- props.setProperty(SchemaTupleBackend.SHOULD_GENERATE_KEY, "true");
+ props.setProperty(PigConfiguration.SHOULD_USE_SCHEMA_TUPLE, "true");
conf = ConfigurationUtil.toConfiguration(props);
+
pigContext = new PigContext(ExecType.LOCAL, props);
}
@@ -99,7 +97,7 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
isAppendable = false;
- context = GenContext.LOAD;
+ context = GenContext.FOREACH;
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
udfSchema = Utils.getSchemaFromString("a:int,(a:int,(a:int,(a:int,(a:int,(a:int,(a:int))))))");
@@ -180,11 +178,19 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+ isAppendable = false;
+ udfSchema = new Schema();
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
+ isAppendable = false;
+ udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
+ SchemaTupleFrontend.registerToGenerateIfPossible(udfSchema, isAppendable, context);
+
// this compiles and "ships"
SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
//backend
- SchemaTupleBackend.initialize(conf, ExecType.LOCAL);
+ SchemaTupleBackend.initialize(conf, pigContext);
udfSchema = Utils.getSchemaFromString("a:int");
isAppendable = false;
@@ -209,7 +215,7 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("a:chararray,(a:chararray)");
isAppendable = false;
- context = GenContext.LOAD;
+ context = GenContext.FOREACH;
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
@@ -302,6 +308,16 @@ public class TestSchemaTuple {
udfSchema = Utils.getSchemaFromString("int, m:map[(int,int,int)]");
tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
putThroughPaces(tf, udfSchema, isAppendable);
+
+ isAppendable = false;
+ udfSchema = new Schema();
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ assertNull(tf);
+
+ isAppendable = false;
+ udfSchema = new Schema(new FieldSchema(null, DataType.BAG));
+ tf = SchemaTupleFactory.getInstance(udfSchema, isAppendable, context);
+ putThroughPaces(tf, udfSchema, isAppendable);
}
private void putThroughPaces(SchemaTupleFactory tf, Schema udfSchema, boolean isAppendable) throws Exception {
Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Oct 5 21:18:23 2012
@@ -129,8 +129,6 @@ import org.junit.Test;
public class TestBuiltin {
- private String initString = "local";
-
PigServer pigServer;
// This should only be used when absolutely necessary -- eg, when using ReadToEndLoader.
@@ -304,7 +302,6 @@ public class TestBuiltin {
expectedMap.put("FloatAvgIntermediate", expectedMap.get("FloatSum"));
// set up input hash
- try{
inputMap.put("Integer", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intInput));
inputMap.put("IntegerAsLong", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), intAsLong));
inputMap.put("Long", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), longInput));
@@ -315,11 +312,6 @@ public class TestBuiltin {
inputMap.put("ByteArrayAsDouble", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), baAsDouble));
inputMap.put("String", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), stringInput));
- }catch(ExecException e) {
- e.printStackTrace();
- }
-
-
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
}
Modified: pig/trunk/test/org/apache/pig/test/TestCommit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCommit.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCommit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCommit.java Fri Oct 5 21:18:23 2012
@@ -17,53 +17,28 @@
*/
package org.apache.pig.test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.apache.pig.ComparisonFunc;
-import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.Distinct;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.*;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.PigFile;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
-import junit.framework.TestCase;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.Before;
+import org.junit.Test;
-@RunWith(JUnit4.class)
-public class TestCommit extends TestCase {
+public class TestCommit {
private PigServer pigServer;
- TupleFactory mTf = TupleFactory.getInstance();
+ private static final TupleFactory mTf = TupleFactory.getInstance();
@Before
- @Override
public void setUp() throws Exception{
pigServer = new PigServer(ExecType.LOCAL, new Properties());
}
@@ -95,11 +70,11 @@ public class TestCommit extends TestCase
Tuple t = iter.next();
count++;
if (count == 1) {
- assertTrue(t.get(0).equals(expected1.get(0)));
- assertTrue(t.get(1).equals(expected1.get(1)));
+ assertEquals(t.get(0), expected1.get(0));
+ assertEquals(t.get(1), expected1.get(1));
} else if (count == 2){
- assertTrue(t.get(0).equals(expected2.get(0)));
- assertTrue(t.get(1).equals(expected2.get(1)));
+ assertEquals(t.get(0), expected2.get(0));
+ assertEquals(t.get(1), expected2.get(1));
}
}
assertEquals(count, 2);
@@ -146,6 +121,7 @@ public class TestCommit extends TestCase
}
pigServer.deleteFile("testCheckin2-output.txt");
assertEquals(count, 2);
- assertTrue(contain1 && contain2);
+ assertTrue(contain1);
+ assertTrue(contain2);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java Fri Oct 5 21:18:23 2012
@@ -18,7 +18,7 @@
package org.apache.pig.test;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.FileOutputStream;
@@ -32,35 +32,22 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.newplan.Operator;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestExampleGenerator {
-
static PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
-
static int MAX = 100;
static String A, B;
static File fileA, fileB;
- {
- try {
- pigContext.connect();
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
@BeforeClass
public static void oneTimeSetup() throws Exception {
-
+ pigContext.connect();
fileA = File.createTempFile("dataA", ".dat");
fileB = File.createTempFile("dataB", ".dat");
@@ -100,7 +87,7 @@ public class TestExampleGenerator {
pigserver.registerQuery(query);
Map<Operator, DataBag> derivedData = pigserver.getExamples("A");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -116,7 +103,7 @@ public class TestExampleGenerator {
pigserver.registerQuery(query);
Map<Operator, DataBag> derivedData = pigserver.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -132,7 +119,7 @@ public class TestExampleGenerator {
pigserver.registerQuery(query);
Map<Operator, DataBag> derivedData = pigserver.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -153,7 +140,7 @@ public class TestExampleGenerator {
pigserver.registerQuery(query);
Map<Operator, DataBag> derivedData = pigserver.getExamples("E");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -167,7 +154,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
//see PIG-2170
@@ -181,7 +168,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -195,7 +182,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -208,7 +195,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("E");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -221,7 +208,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("E");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -233,7 +220,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = cogroup A by (x, y), B by (x, y);");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -244,7 +231,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = cogroup A by x, B by x;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -254,7 +241,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("B = group A by x;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -266,7 +253,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -279,7 +266,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("D = foreach C generate group, COUNT(B);");
Map<Operator, DataBag> derivedData = pigServer.getExamples("D");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -292,7 +279,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("D = UNION B, C;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("D");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -304,7 +291,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = foreach B { FA = filter A by y == 6; generate group, COUNT(FA);};");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -316,7 +303,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = foreach B { FA = filter A by y == 6; DA = DISTINCT FA; generate group, COUNT(DA);};");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@@ -328,7 +315,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = union A, B;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -338,7 +325,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("B = DISTINCT A;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -349,7 +336,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("C = CROSS A, B;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -359,7 +346,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("B = limit A 5;");
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
//see PIG-2275
@@ -373,7 +360,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("B");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -387,7 +374,7 @@ public class TestExampleGenerator {
Map<Operator, DataBag> derivedData = pigServer.getExamples("C");
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
@Test
@@ -405,7 +392,7 @@ public class TestExampleGenerator {
pigServer.registerQuery("store D into '" + out.getAbsolutePath() + "';");
Map<Operator, DataBag> derivedData = pigServer.getExamples(null);
- assertTrue(derivedData != null);
+ assertNotNull(derivedData);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=1394816&r1=1394815&r2=1394816&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLoad.java Fri Oct 5 21:18:23 2012
@@ -17,6 +17,9 @@
*/
package org.apache.pig.test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -30,8 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import junit.framework.Assert;
-
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
@@ -55,22 +56,17 @@ import org.apache.pig.parser.ParserExcep
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.TestHelper;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-@RunWith(JUnit4.class)
-public class TestLoad extends junit.framework.TestCase {
+public class TestLoad {
PigContext pc;
PigServer[] servers;
static MiniCluster cluster = MiniCluster.buildCluster();
- @Override
@Before
public void setUp() throws Exception {
FileLocalizer.deleteTempFiles();
@@ -80,11 +76,6 @@ public class TestLoad extends junit.fram
};
}
- @Override
- @After
- public void tearDown() throws Exception {
- }
-
@Test
public void testGetNextTuple() throws IOException {
pc = servers[0].getPigContext();
@@ -264,7 +255,7 @@ public class TestLoad extends junit.fram
LogicalPlan lp = Util.buildLp(servers[1], query);
LOLoad load = (LOLoad) lp.getSources().get(0);
nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
- Assert.assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
+ assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
}
@SuppressWarnings("unchecked")
@@ -297,7 +288,7 @@ public class TestLoad extends junit.fram
}
Collections.sort(expectedBasedOnNumberOfInputs);
Collections.sort(actual);
- Assert.assertEquals(expectedBasedOnNumberOfInputs, actual);
+ assertEquals(expectedBasedOnNumberOfInputs, actual);
} finally {
for(int i = 0; i < inputFileNames.length; i++) {
Util.deleteFile(pc, inputFileNames[i]);
@@ -325,24 +316,24 @@ public class TestLoad extends junit.fram
String query = "a = load '"+orig+"';";
LogicalPlan lp = builder.parse(query);
- Assert.assertTrue(lp.size()>0);
+ assertTrue(lp.size()>0);
Operator op = lp.getSources().get(0);
- Assert.assertTrue(op instanceof LOLoad);
+ assertTrue(op instanceof LOLoad);
LOLoad load = (LOLoad)op;
String p = load.getFileSpec().getFileName();
System.err.println("DEBUG: p:" + p + " expected:" + expected +", exectype:" + pc.getExecType());
if(noConversionExpected) {
- Assert.assertEquals(p, expected);
+ assertEquals(p, expected);
} else {
if (pc.getExecType() == ExecType.MAPREDUCE) {
- Assert.assertTrue(p.matches(".*hdfs://[0-9a-zA-Z:\\.]*.*"));
- Assert.assertEquals(p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/", "/"),
+ assertTrue(p.matches(".*hdfs://[0-9a-zA-Z:\\.]*.*"));
+ assertEquals(p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/", "/"),
expected);
} else {
- Assert.assertTrue(p.matches(".*file://[0-9a-zA-Z:\\.]*.*"));
- Assert.assertEquals(p.replaceAll("file://[0-9a-zA-Z:\\.]*/", "/"),
+ assertTrue(p.matches(".*file://[0-9a-zA-Z:\\.]*.*"));
+ assertEquals(p.replaceAll("file://[0-9a-zA-Z:\\.]*/", "/"),
expected);
}
}