You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/07/13 00:09:11 UTC
svn commit: r963504 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/...
Author: thejas
Date: Mon Jul 12 22:09:09 2010
New Revision: 963504
URL: http://svn.apache.org/viewvc?rev=963504&view=rev
Log:
PIG-1472: Optimize serialization/deserialization between Map and Reduce and between MR jobs
Added:
hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java
hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java
hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPOSort.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStreaming.java
hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTextDataParser.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenRandomData.java
hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/NGramGenerator.java
hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/ScoreGenerator.java
hadoop/pig/trunk/tutorial/src/org/apache/pig/tutorial/TutorialTest.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Jul 12 22:09:09 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1472: Optimize serialization/deserialization between Map and Reduce and between MR jobs (thejas)
+
PIG-1389: Implement Pig counter to track number of rows for each input files
(rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Jul 12 22:09:09 2010
@@ -54,7 +54,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.HJob;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -65,6 +64,7 @@ import org.apache.pig.experimental.logic
import org.apache.pig.experimental.logical.optimizer.UidStamper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOConst;
import org.apache.pig.impl.logicalLayer.LODefine;
@@ -679,7 +679,7 @@ public class PigServer {
}
ExecJob job = store(id, FileLocalizer.getTemporaryPath(pigContext)
- .toString(), BinStorage.class.getName() + "()");
+ .toString(), InterStorage.class.getName() + "()");
// invocation of "execute" is synchronous!
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Mon Jul 12 22:09:09 2010
@@ -19,24 +19,12 @@ package org.apache.pig.backend.hadoop;
import java.util.Map;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.DoubleWritable;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableBag;
import org.apache.pig.impl.io.NullableBooleanWritable;
import org.apache.pig.impl.io.NullableBytesWritable;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Jul 12 22:09:09 2010
@@ -50,12 +50,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
import org.apache.pig.experimental.logical.optimizer.UidStamper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -393,7 +393,7 @@ public class HExecutionEngine {
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
spec = new FileSpec(FileLocalizer.getTemporaryPath(
pigContext).toString(),
- new FuncSpec(BinStorage.class.getName()));
+ new FuncSpec(InterStorage.class.getName()));
str.setSFile(spec);
plan.addAsLeaf(str);
} else{
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Jul 12 22:09:09 2010
@@ -72,7 +72,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -83,6 +82,7 @@ import org.apache.pig.impl.builtin.Poiss
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -594,7 +594,7 @@ public class MRCompiler extends PhyPlanV
*/
private FileSpec getTempFileSpec() throws IOException {
return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
- new FuncSpec(BinStorage.class.getName()));
+ new FuncSpec(InterStorage.class.getName()));
}
/**
@@ -2077,7 +2077,7 @@ public class MRCompiler extends PhyPlanV
// SampleLoader expects string version of FuncSpec
// as its first constructor argument.
- rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
+ rslargs[0] = (new FuncSpec(InterStorage.class.getName())).toString();
rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopStoreRemover.java Mon Jul 12 22:09:09 2010
@@ -41,7 +41,7 @@ import org.apache.pig.backend.hadoop.exe
*
* ------------- Split ---------
* | |
- * Store(BinStorage) Store(StoreFunc)
+ * Store(InterStorage) Store(StoreFunc)
*
* Followed by a load of the tmp store in a dependent MapReduceOper.
*
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Mon Jul 12 22:09:09 2010
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -32,9 +31,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -151,7 +149,7 @@ public class SampleOptimizer extends MRO
}
POLoad sl = (POLoad)root;
if (loadFile.equals(sl.getLFile().getFileName()) &&
- "org.apache.pig.builtin.BinStorage".equals(sl.getLFile().getFuncName())) {
+ InterStorage.class.getName().equals(sl.getLFile().getFuncName())) {
succLoad = sl;
break;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Mon Jul 12 22:09:09 2010
@@ -21,22 +21,20 @@ package org.apache.pig.backend.hadoop.ex
import java.util.HashMap;
import java.util.Map;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.io.NullablePartitionWritable;
-
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.data.DataType;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
/**
@@ -62,7 +60,7 @@ public class SkewedPartitioner extends P
// for partition table, compute the index based on the sampler output
Pair <Integer, Integer> indexes;
Integer curIndex = -1;
- Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple(1);
+ Tuple keyTuple = TupleFactory.getInstance().newTuple(1);
// extract the key from nullablepartitionwritable
PigNullableWritable key = ((NullablePartitionWritable) wrappedKey).getKey();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Jul 12 22:09:09 2010
@@ -20,27 +20,25 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.LoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
@@ -50,10 +48,6 @@ import org.apache.pig.impl.io.NullableTe
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
implements Configurable {
@@ -109,7 +103,7 @@ public class WeightedRangePartitioner ex
conf.set("fs.hdfs.impl", configuration.get("fs.hdfs.impl"));
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(),
+ ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(),
conf, quantilesFile, 0);
DataBag quantilesList;
Tuple t = loader.getNext();
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Jul 12 22:09:09 2010
@@ -42,10 +42,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -1490,7 +1490,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(split.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Jul 12 22:09:09 2010
@@ -513,6 +513,8 @@ public class POPackage extends PhysicalO
} else {
bags[index].add(copy);
}
+ }else{
+ break;
}
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Jul 12 22:09:09 2010
@@ -35,19 +35,19 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -86,7 +86,7 @@ public class MapRedUtil {
conf.set("fs.hdfs.impl", PigMapReduce.sJobConf.get("fs.hdfs.impl"));
conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
- ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), conf,
+ ReadToEndLoader loader = new ReadToEndLoader(new InterStorage(), conf,
keyDistFile, 0);
DataBag partitionList;
Tuple t = loader.getNext();
@@ -113,14 +113,14 @@ public class MapRedUtil {
if (idxTuple.size() > 3) {
// remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store
// it in the reducer map
- Tuple keyTuple = DefaultTupleFactory.getInstance().newTuple();
+ Tuple keyTuple = TupleFactory.getInstance().newTuple();
for (int i=0; i < idxTuple.size() - 2; i++) {
keyTuple.append(idxTuple.get(i));
}
keyT = (E) keyTuple;
} else {
if (keyType == DataType.TUPLE) {
- keyT = (E)DefaultTupleFactory.getInstance().newTuple(1);
+ keyT = (E)TupleFactory.getInstance().newTuple(1);
((Tuple)keyT).set(0,idxTuple.get(0));
} else {
keyT = (E) idxTuple.get(0);
@@ -151,7 +151,7 @@ public class MapRedUtil {
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
spec = new FileSpec(FileLocalizer.getTemporaryPath(
pigContext).toString(),
- new FuncSpec(BinStorage.class.getName()));
+ new FuncSpec(InterStorage.class.getName()));
str.setSFile(spec);
plan.addAsLeaf(str);
} else{
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Mon Jul 12 22:09:09 2010
@@ -69,10 +69,6 @@ import org.apache.pig.impl.util.LogUtils
public class BinStorage extends FileInputLoadFunc
implements LoadCaster, StoreFuncInterface, LoadMetadata {
-
- public static final int RECORD_1 = 0x01;
- public static final int RECORD_2 = 0x02;
- public static final int RECORD_3 = 0x03;
Iterator<Tuple> i = null;
private static final Log mLog = LogFactory.getLog(BinStorage.class);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COR.java Mon Jul 12 22:09:09 2010
@@ -28,7 +28,7 @@ import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -71,7 +71,7 @@ public class COR extends EvalFunc<DataBa
DataBag output = DefaultBagFactory.getInstance().newDefaultBag();
for(int i=0;i<input.size();i++){
for(int j=i+1;j<input.size();j++){
- Tuple temp = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple temp = TupleFactory.getInstance().newTuple(3);
try{
if(flag){
temp.set(0, schemaName.elementAt(i));
@@ -142,7 +142,7 @@ public class COR extends EvalFunc<DataBa
public Tuple exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
- Tuple output = DefaultTupleFactory.getInstance().newTuple(input.size() * 2);
+ Tuple output = TupleFactory.getInstance().newTuple(input.size() * 2);
try {
int k = -1;
for(int i=0;i<input.size();i++){
@@ -211,7 +211,7 @@ public class COR extends EvalFunc<DataBa
DataBag output = DefaultBagFactory.getInstance().newDefaultBag();
for(int i=0;i<totalSchemas;i++){
for(int j=i+1;j<totalSchemas;j++){
- Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple result = TupleFactory.getInstance().newTuple(3);
try{
if(flag){
result.set(0, schemaName.elementAt(i));
@@ -250,9 +250,9 @@ public class COR extends EvalFunc<DataBa
* @throws IOException
*/
static protected Tuple combine(DataBag values) throws IOException {
- Tuple output = DefaultTupleFactory.getInstance().newTuple();
+ Tuple output = TupleFactory.getInstance().newTuple();
Tuple tuple; // copy of DataBag values
- tuple = DefaultTupleFactory.getInstance().newTuple(values.size());
+ tuple = TupleFactory.getInstance().newTuple(values.size());
int ct=0;
try{
@@ -281,7 +281,7 @@ public class COR extends EvalFunc<DataBa
sum_x_square += (Double)tem.get(3);
sum_y_square += (Double)tem.get(4);
}
- Tuple result = DefaultTupleFactory.getInstance().newTuple(5);
+ Tuple result = TupleFactory.getInstance().newTuple(5);
result.set(0, sum_x_y);
result.set(1, sum_x);
result.set(2, sum_y);
@@ -325,7 +325,7 @@ public class COR extends EvalFunc<DataBa
throw new IOException("Caught exception processing input", e);
}
- Tuple result = DefaultTupleFactory.getInstance().newTuple(5);
+ Tuple result = TupleFactory.getInstance().newTuple(5);
try{
result.set(0, sum_x_y);
result.set(1, sum_x);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COV.java Mon Jul 12 22:09:09 2010
@@ -28,7 +28,7 @@ import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -73,7 +73,7 @@ public class COV extends EvalFunc<DataBa
try{
for(int i=0;i<input.size();i++){
for(int j=i+1;j<input.size();j++){
- Tuple temp = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple temp = TupleFactory.getInstance().newTuple(3);
if(flag){
temp.set(0, schemaName.elementAt(i));
temp.set(1, schemaName.elementAt(j));
@@ -143,7 +143,7 @@ public class COV extends EvalFunc<DataBa
if (input == null || input.size() == 0)
return null;
- Tuple output = DefaultTupleFactory.getInstance().newTuple();
+ Tuple output = TupleFactory.getInstance().newTuple();
try {
for(int i=0;i<input.size();i++){
for(int j=i+1;j<input.size();j++){
@@ -208,7 +208,7 @@ public class COV extends EvalFunc<DataBa
}
for(int i=0;i<totalSchemas;i++){
for(int j=i+1;j<totalSchemas;j++){
- Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple result = TupleFactory.getInstance().newTuple(3);
if(flag){
result.set(0, schemaName.elementAt(i));
result.set(1, schemaName.elementAt(j));
@@ -243,8 +243,8 @@ public class COV extends EvalFunc<DataBa
* @throws IOException
*/
static protected Tuple combine(DataBag values) throws IOException {
- Tuple tuple = DefaultTupleFactory.getInstance().newTuple(Double.valueOf(values.size()).intValue());
- Tuple output = DefaultTupleFactory.getInstance().newTuple();
+ Tuple tuple = TupleFactory.getInstance().newTuple(Double.valueOf(values.size()).intValue());
+ Tuple output = TupleFactory.getInstance().newTuple();
int ct=0;
for (Iterator<Tuple> it = values.iterator(); it.hasNext();ct++) {
@@ -268,7 +268,7 @@ public class COV extends EvalFunc<DataBa
sum_x+=(Double)tem.get(1);
sum_y+=(Double)tem.get(2);
}
- Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple result = TupleFactory.getInstance().newTuple(3);
result.set(0, sum_x_y);
result.set(1, sum_x);
result.set(2, sum_y);
@@ -306,7 +306,7 @@ public class COV extends EvalFunc<DataBa
throw new IOException("Caught exception processing input", e);
}
- Tuple result = DefaultTupleFactory.getInstance().newTuple(3);
+ Tuple result = TupleFactory.getInstance().newTuple(3);
try{
result.set(0, sum_x_y);
result.set(1, sum_x);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Mon Jul 12 22:09:09 2010
@@ -36,7 +36,6 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.LogUtils;
@@ -117,7 +116,7 @@ public class Utf8StorageConverter implem
return null;
}
}
- Tuple t = DefaultTupleFactory.getInstance().newTuple();
+ Tuple t = TupleFactory.getInstance().newTuple();
if (fieldSchema.getSchema()!=null && fieldSchema.getSchema().getFields().length!=0) {
ResourceFieldSchema[] fss = fieldSchema.getSchema().getFields();
// Interpret item inside tuple one by one based on the inner schema
Added: hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinInterSedes.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * A class to handle reading and writing of intermediate results of data
+ * types. The serialization format used by this class more efficient than
+ * what was used in DataReaderWriter .
+ * The format used by the functions in this class is subject to change, so it
+ * should be used ONLY to store intermediate results within a pig query.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class BinInterSedes implements InterSedes {
+
+ public static final byte BOOLEAN_TRUE = 0;
+ static final byte BOOLEAN_FALSE = 1;
+
+ public static final byte BYTE = 2;
+
+ public static final byte INTEGER = 3;
+ // since boolean is not supported yet(v0.7) as external type,
+ // lot of people use int instead
+ // and some data with old schema is likely stay for some time.
+ // so optimizing for that case as well
+ public static final byte INTEGER_0 = 4;
+ public static final byte INTEGER_1 = 5;
+ public static final byte INTEGER_INSHORT = 6;
+ public static final byte INTEGER_INBYTE = 7;
+
+
+ public static final byte LONG = 8;
+ public static final byte FLOAT = 9;
+ public static final byte DOUBLE = 10;
+
+ public static final byte BYTEARRAY = 11;
+ public static final byte SMALLBYTEARRAY = 12;
+ public static final byte TINYBYTEARRAY = 13;
+
+ public static final byte CHARARRAY = 14;
+ public static final byte SMALLCHARARRAY = 15;
+
+ public static final byte MAP = 16;
+ public static final byte SMALLMAP = 17;
+ public static final byte TINYMAP = 18;
+
+ public static final byte TUPLE = 19;
+ public static final byte SMALLTUPLE = 20;
+ public static final byte TINYTUPLE = 21;
+
+ public static final byte BAG = 22;
+ public static final byte SMALLBAG = 23;
+ public static final byte TINYBAG = 24;
+
+ public static final byte GENERIC_WRITABLECOMPARABLE = 25;
+ public static final byte INTERNALMAP = 26;
+
+ public static final byte NULL = 27;
+
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ private static BagFactory mBagFactory = BagFactory.getInstance();
+ static final int UNSIGNED_SHORT_MAX = 65535;
+ static final int UNSIGNED_BYTE_MAX = 255;
+ static final String UTF8 = "UTF-8";
+
+
+
+ private Tuple readTuple(DataInput in, byte type) throws IOException {
+ // Read the size.
+ int sz = getTupleSize(in, type);
+
+ Tuple t = mTupleFactory.newTuple(sz);
+ for (int i = 0; i < sz; i++) {
+ t.set(i, readDatum(in));
+ }
+ return t;
+
+ }
+
+
+
+ private int getTupleSize(DataInput in, byte type) throws IOException {
+ int sz ;
+ switch(type){
+ case TINYTUPLE:
+ sz = in.readUnsignedByte();
+ break;
+ case SMALLTUPLE:
+ sz = in.readUnsignedShort();
+ break;
+ case TUPLE:
+ sz = in.readInt();
+ break;
+ default: {
+ int errCode = 2112;
+ String msg = "Unexpected datatype " + type + " while reading tuple" +
+ "from binary file.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+ // if sz == 0, we construct an "empty" tuple -
+ // presumably the writer wrote an empty tuple!
+ if (sz < 0) {
+ throw new IOException("Invalid size " + sz + " for a tuple");
+ }
+ return sz;
+ }
+
+
+
+ private DataBag readBag(DataInput in, byte type) throws IOException {
+ DataBag bag = mBagFactory.newDefaultBag();
+ long size;
+ //determine size of bag
+ switch(type){
+ case TINYBAG:
+ size = in.readUnsignedByte();
+ break;
+ case SMALLBAG:
+ size = in.readUnsignedShort();
+ break;
+ case BAG:
+ size = in.readLong();
+ break;
+ default:
+ int errCode = 2219;
+ String msg = "Unexpected data while reading bag " +
+ "from binary file.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+
+ for (long i = 0; i < size; i++) {
+ try {
+ Object o = readDatum(in);
+ bag.add((Tuple)o);
+ } catch (ExecException ee) {
+ throw ee;
+ }
+ }
+ return bag;
+ }
+
+
+ private Map<String, Object> readMap(DataInput in, byte type) throws IOException {
+ int size ;
+ switch(type){
+ case TINYMAP:
+ size = in.readUnsignedByte();
+ break;
+ case SMALLMAP:
+ size = in.readUnsignedShort();
+ break;
+ case MAP:
+ size = in.readInt();
+ break;
+ default: {
+ int errCode = 2220;
+ String msg = "Unexpected data while reading map" +
+ "from binary file.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+ Map<String, Object> m = new HashMap<String, Object>(size);
+ for (int i = 0; i < size; i++) {
+ String key = (String)readDatum(in);
+ m.put(key, readDatum(in));
+ }
+ return m;
+ }
+
+ private InternalMap readInternalMap(DataInput in) throws IOException {
+ int size = in.readInt();
+ InternalMap m = new InternalMap(size);
+ for (int i = 0; i < size; i++) {
+ Object key = readDatum(in);
+ m.put(key, readDatum(in));
+ }
+ return m;
+ }
+
+ private static String readCharArray(DataInput in) throws IOException{
+ return in.readUTF();
+ }
+
+ private static String readBigCharArray(DataInput in) throws IOException{
+ int size = in.readInt();
+ byte[] ba = new byte[size];
+ in.readFully(ba);
+ return new String(ba, UTF8);
+ }
+
+ private Writable readWritable(DataInput in) throws IOException {
+ String className = (String) readDatum(in);
+ // create the writeable class . It needs to have a default constructor
+ Class<?> objClass = null ;
+ try {
+ objClass = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not find class " + className +
+ ", while attempting to de-serialize it ", e);
+ }
+ Writable writable = null;
+ try {
+ writable = (Writable) objClass.newInstance();
+ } catch (Exception e) {
+ String msg = "Could create instance of class " + className +
+ ", while attempting to de-serialize it. (no default constructor ?)";
+ throw new IOException(msg, e);
+ }
+
+ //read the fields of the object from DataInput
+ writable.readFields(in);
+ return writable;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput)
+ */
+ public Object readDatum(DataInput in) throws IOException, ExecException {
+ // Read the data type
+ byte b = in.readByte();
+ return readDatum(in, b);
+ }
+
+ private static Object readBytes(DataInput in, int size) throws IOException {
+ byte[] ba = new byte[size];
+ in.readFully(ba);
+ return new DataByteArray(ba);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.InterSedes#readDatum(java.io.DataInput, byte)
+ */
+ public Object readDatum(DataInput in, byte type) throws IOException, ExecException {
+ switch (type) {
+ case TUPLE:
+ case TINYTUPLE:
+ case SMALLTUPLE:
+ return readTuple(in, type);
+
+ case BAG:
+ case TINYBAG:
+ case SMALLBAG:
+ return readBag(in, type);
+
+ case MAP:
+ case TINYMAP:
+ case SMALLMAP:
+ return readMap(in, type);
+
+ case INTERNALMAP:
+ return readInternalMap(in);
+
+ case INTEGER_0:
+ return Integer.valueOf(0);
+ case INTEGER_1:
+ return Integer.valueOf(1);
+ case INTEGER_INBYTE:
+ return Integer.valueOf(in.readByte());
+ case INTEGER_INSHORT:
+ return Integer.valueOf(in.readShort());
+ case INTEGER:
+ return Integer.valueOf(in.readInt());
+
+ case LONG:
+ return Long.valueOf(in.readLong());
+
+ case FLOAT:
+ return Float.valueOf(in.readFloat());
+
+ case DOUBLE:
+ return Double.valueOf(in.readDouble());
+
+ case BOOLEAN_TRUE:
+ return Boolean.valueOf(true);
+
+ case BOOLEAN_FALSE:
+ return Boolean.valueOf(false);
+
+ case BYTE:
+ return Byte.valueOf(in.readByte());
+
+ case TINYBYTEARRAY :{
+ int size = in.readUnsignedByte();
+ return readBytes(in, size);
+ }
+
+ case SMALLBYTEARRAY :{
+ int size = in.readUnsignedShort();
+ return readBytes(in, size);
+ }
+
+ case BYTEARRAY: {
+ int size = in.readInt();
+ return readBytes(in, size);
+ }
+
+ case CHARARRAY:
+ return readBigCharArray(in);
+
+ case SMALLCHARARRAY:
+ return readCharArray(in);
+
+ case GENERIC_WRITABLECOMPARABLE :
+ return readWritable(in);
+
+ case NULL:
+ return null;
+
+ default:
+ throw new RuntimeException("Unexpected data type " + type +
+ " found in stream.");
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.InterSedes#writeDatum(java.io.DataOutput, java.lang.Object)
+ */
+ @SuppressWarnings("unchecked")
+ public void writeDatum(
+ DataOutput out,
+ Object val) throws IOException {
+ // Read the data type
+ byte type = DataType.findType(val);
+ switch (type) {
+ case DataType.TUPLE:
+ writeTuple(out, (Tuple)val);
+ break;
+
+ case DataType.BAG:
+ writeBag(out, (DataBag)val);
+ break;
+
+ case DataType.MAP: {
+ writeMap(out, (Map<String, Object>)val);
+
+ break;
+ }
+
+ case DataType.INTERNALMAP: {
+ out.writeByte(INTERNALMAP);
+ Map<Object, Object> m = (Map<Object, Object>)val;
+ out.writeInt(m.size());
+ Iterator<Map.Entry<Object, Object> > i =
+ m.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> entry = i.next();
+ writeDatum(out, entry.getKey());
+ writeDatum(out, entry.getValue());
+ }
+ break;
+ }
+
+ case DataType.INTEGER:
+ int i = (Integer)val;
+ if(i == 0){
+ out.writeByte(INTEGER_0);
+ }else if(i == 1){
+ out.writeByte(INTEGER_1);
+ }
+ else if(Byte.MIN_VALUE <= i && i <= Byte.MAX_VALUE ){
+ out.writeByte(INTEGER_INBYTE);
+ out.writeByte(i);
+ }
+ else if(Short.MIN_VALUE <= i && i <= Short.MAX_VALUE ){
+ out.writeByte(INTEGER_INSHORT);
+ out.writeShort(i);
+ }
+ else{
+ out.writeByte(INTEGER);
+ out.writeInt(i);
+ }
+
+
+ break;
+
+ case DataType.LONG:
+ out.writeByte(LONG);
+ out.writeLong((Long)val);
+ break;
+
+ case DataType.FLOAT:
+ out.writeByte(FLOAT);
+ out.writeFloat((Float)val);
+ break;
+
+ case DataType.DOUBLE:
+ out.writeByte(DOUBLE);
+ out.writeDouble((Double)val);
+ break;
+
+ case DataType.BOOLEAN:
+ if(((Boolean)val) == true)
+ out.writeByte(BOOLEAN_TRUE);
+ else
+ out.writeByte(BOOLEAN_FALSE);
+ break;
+
+ case DataType.BYTE:
+ out.writeByte(BYTE);
+ out.writeByte((Byte)val);
+ break;
+
+ case DataType.BYTEARRAY: {
+ DataByteArray bytes = (DataByteArray)val;
+ final int sz = bytes.size();
+ if(sz < UNSIGNED_BYTE_MAX){
+ out.writeByte(TINYBYTEARRAY);
+ out.writeByte(sz);
+ }
+ else if(sz < UNSIGNED_SHORT_MAX){
+ out.writeByte(SMALLBYTEARRAY);
+ out.writeShort(sz);
+ }
+ else {
+ out.writeByte(BYTEARRAY);
+ out.writeInt(sz);
+ }
+ out.write(bytes.mData);
+
+ break;
+
+ }
+
+ case DataType.CHARARRAY: {
+ String s = (String)val;
+ // a char can take up to 3 bytes in the modified utf8 encoding
+ // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
+ if(s.length() < UNSIGNED_SHORT_MAX/3) {
+ out.writeByte(SMALLCHARARRAY);
+ out.writeUTF(s);
+ } else {
+ byte[] utfBytes = s.getBytes(UTF8);
+ int length = utfBytes.length;
+
+ out.writeByte(CHARARRAY);
+ out.writeInt(length);
+ out.write(utfBytes);
+ }
+ break;
+ }
+ case DataType.GENERIC_WRITABLECOMPARABLE :
+ out.writeByte(GENERIC_WRITABLECOMPARABLE);
+ //store the class name, so we know the class to create on read
+ writeDatum(out, val.getClass().getName());
+ Writable writable = (Writable)val;
+ writable.write(out);
+ break;
+
+ case DataType.NULL:
+ out.writeByte(NULL);
+ break;
+
+ default:
+ throw new RuntimeException("Unexpected data type " + type +
+ " found in stream.");
+ }
+ }
+
+ private void writeMap(DataOutput out, Map<String, Object> m)
+ throws IOException {
+
+ final int sz = m.size();
+ if(sz < UNSIGNED_BYTE_MAX){
+ out.writeByte(TINYMAP);
+ out.writeByte(sz);
+ }else if(sz < UNSIGNED_SHORT_MAX){
+ out.writeByte(SMALLMAP);
+ out.writeShort(sz);
+ }else {
+ out.writeByte(MAP);
+ out.writeInt(sz);
+ }
+ Iterator<Map.Entry<String, Object> > i =
+ m.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, Object> entry = i.next();
+ writeDatum(out, entry.getKey());
+ writeDatum(out, entry.getValue());
+ }
+ }
+
+
+
+ private void writeBag(DataOutput out, DataBag bag)
+ throws IOException {
+ // We don't care whether this bag was sorted or distinct because
+ // using the iterator to write it will guarantee those things come
+ // correctly. And on the other end there'll be no reason to waste
+ // time re-sorting or re-applying distinct.
+ final long sz = bag.size();
+ if(sz < UNSIGNED_BYTE_MAX){
+ out.writeByte(TINYBAG);
+ out.writeByte((int)sz);
+ }else if(sz < UNSIGNED_SHORT_MAX){
+ out.writeByte(SMALLBAG);
+ out.writeShort((int)sz);
+ }else {
+ out.writeByte(BAG);
+ out.writeLong(sz);
+ }
+
+ Iterator<Tuple> it = bag.iterator();
+ while (it.hasNext()) {
+ writeTuple(out, it.next());
+ }
+
+ }
+
+ private void writeTuple(DataOutput out, Tuple t) throws IOException {
+ final int sz = t.size();
+ if(sz < UNSIGNED_BYTE_MAX){
+ out.writeByte(TINYTUPLE);
+ out.writeByte(sz);
+ }else if(sz < UNSIGNED_SHORT_MAX){
+ out.writeByte(SMALLTUPLE);
+ out.writeShort(sz);
+ }else{
+ out.writeByte(TUPLE);
+ out.writeInt(sz);
+ }
+
+ for (int i = 0; i < sz; i++) {
+ writeDatum(out, t.get(i));
+ }
+ }
+
+
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.InterSedes#addColsToTuple(java.io.DataInput, org.apache.pig.data.Tuple)
+ */
+ @Override
+ public void addColsToTuple(DataInput in, Tuple t)
+ throws IOException {
+ byte type = in.readByte();
+ int sz = getTupleSize(in, type);
+ for (int i = 0; i < sz; i++) {
+ t.append(readDatum(in));
+ }
+ }
+
+
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTuple.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+/**
+ * This tuple has a faster (de)serialization mechanism. It to be used for
+ * storing intermediate data between Map and Reduce and between MR jobs.
+ * This is for internal pig use only. The serialization format can change, so
+ * do not use it for storing any persistant data (ie in load/store functions).
+ */
+@InterfaceAudience.Private
+public class BinSedesTuple extends DefaultTuple {
+
+ private static final long serialVersionUID = 1L;
+ private static final InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+
+ public void write(DataOutput out) throws IOException {
+ sedes.writeDatum(out, this);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+
+ // Clear our fields, in case we're being reused.
+ mFields.clear();
+ sedes.addColsToTuple(in, this);
+ }
+
+
+
+ /**
+ * Default constructor
+ */
+ BinSedesTuple() {
+ super();
+ }
+
+ /**
+ * Construct a tuple with a known number of fields. Package level so
+ * that callers cannot directly invoke it.
+ * @param size Number of fields to allocate in the tuple.
+ */
+ BinSedesTuple(int size) {
+ super(size);
+ }
+
+ /**
+ * Construct a tuple from an existing list of objects. Package
+ * level so that callers cannot directly invoke it.
+ * @param c List of objects to turn into a tuple.
+ */
+ BinSedesTuple(List<Object> c) {
+ super(c);
+ }
+
+ /**
+ * Construct a tuple from an existing list of objects. Package
+ * level so that callers cannot directly invoke it.
+ * @param c List of objects to turn into a tuple. This list will be kept
+ * as part of the tuple.
+ * @param junk Just used to differentiate from the constructor above that
+ * copies the list.
+ */
+ BinSedesTuple(List<Object> c, int junk) {
+ super(c, junk);
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.lang.Class;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+
+/**
+ * Default implementation of TupleFactory.
+ */
+@InterfaceAudience.Private
+public class BinSedesTupleFactory extends TupleFactory {
+ public Tuple newTuple() {
+ return new BinSedesTuple();
+
+ }
+
+ public Tuple newTuple(int size) {
+ return new BinSedesTuple(size);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple newTuple(List c) {
+ return new BinSedesTuple(c);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple newTupleNoCopy(List list) {
+ return new BinSedesTuple(list, 1);
+ }
+
+ public Tuple newTuple(Object datum) {
+ Tuple t = new BinSedesTuple(1);
+ try {
+ t.set(0, datum);
+ } catch (ExecException e) {
+ // The world has come to an end, we just allocated a tuple with one slot
+ // but we can't write to that slot.
+ throw new RuntimeException("Unable to write to field 0 in newly " +
+ "allocated tuple of size 1!", e);
+ }
+ return t;
+ }
+
+ public Class tupleClass() {
+ return BinSedesTuple.class;
+ }
+
+ BinSedesTupleFactory() {
+ }
+
+}
+
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Mon Jul 12 22:09:09 2010
@@ -35,8 +35,10 @@ import org.apache.pig.classification.Int
import org.apache.pig.backend.executionengine.ExecException;
/**
- * A class to handle reading and writing of intermediate results of data
- * types. This class could also be used for storing permanent results.
+ * This class was used to handle reading and writing of intermediate
+ * results of data types. Now that functionality is in {@link BinInterSedes}
+ * This class could also be used for storing permanent results, it used
+ * by BinStorage and Zebra through DefaultTuple class.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
@@ -68,7 +70,16 @@ public class DataReaderWriter {
public static DataBag bytesToBag(DataInput in) throws IOException {
DataBag bag = mBagFactory.newDefaultBag();
- bag.readFields(in);
+ long size = in.readLong();
+
+ for (long i = 0; i < size; i++) {
+ try {
+ Object o = readDatum(in);
+ bag.add((Tuple)o);
+ } catch (ExecException ee) {
+ throw ee;
+ }
+ }
return bag;
}
@@ -202,16 +213,23 @@ public class DataReaderWriter {
byte type = DataType.findType(val);
switch (type) {
case DataType.TUPLE:
- // Because tuples are written directly by hadoop, the
- // tuple's write method needs to write the indicator byte.
- // So don't write the indicator byte here as it is for
- // everyone else.
- ((Tuple)val).write(out);
+ Tuple t = (Tuple)val;
+ out.writeByte(DataType.TUPLE);
+ int sz = t.size();
+ out.writeInt(sz);
+ for (int i = 0; i < sz; i++) {
+ DataReaderWriter.writeDatum(out, t.get(i));
+ }
break;
case DataType.BAG:
+ DataBag bag = (DataBag)val;
out.writeByte(DataType.BAG);
- ((DataBag)val).write(out);
+ out.writeLong(bag.size());
+ Iterator<Tuple> it = bag.iterator();
+ while (it.hasNext()) {
+ DataReaderWriter.writeDatum(out, it.next());
+ }
break;
case DataType.MAP: {
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Mon Jul 12 22:09:09 2010
@@ -47,10 +47,11 @@ import org.apache.commons.logging.LogFac
*/
public abstract class DefaultAbstractBag implements DataBag {
- private static final Log log = LogFactory.getLog(DataBag.class);
-
- private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+ private static final Log log = LogFactory.getLog(DataBag.class);
+ private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+
+ private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
// Container that holds the tuples. Actual object instantiated by
// subclasses.
protected Collection<Tuple> mContents;
@@ -239,16 +240,7 @@ public abstract class DefaultAbstractBag
* @throws IOException (passes it on from underlying calls).
*/
public void write(DataOutput out) throws IOException {
- // We don't care whether this bag was sorted or distinct because
- // using the iterator to write it will guarantee those things come
- // correctly. And on the other end there'll be no reason to waste
- // time re-sorting or re-applying distinct.
- out.writeLong(size());
- Iterator<Tuple> it = iterator();
- while (it.hasNext()) {
- Tuple item = it.next();
- item.write(out);
- }
+ sedes.writeDatum(out, this);
}
/**
@@ -261,7 +253,7 @@ public abstract class DefaultAbstractBag
for (long i = 0; i < size; i++) {
try {
- Object o = DataReaderWriter.readDatum(in);
+ Object o = sedes.readDatum(in);
add((Tuple)o);
} catch (ExecException ee) {
throw ee;
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTuple.java Mon Jul 12 22:09:09 2010
@@ -31,15 +31,17 @@ import org.apache.pig.backend.executione
import org.apache.pig.impl.util.TupleFormat;
/**
- * A default implementation of Tuple. This class will be created by the
- * DefaultTupleFactory.
+ * This was the old default implementation of Tuple. The new default is
+ * {@link BinSedesTuple} .
+ * Zebra and BinStorage load/store functions use the .write(..) and .readFields(..)
+ * functions here for (de)serialization.
*/
public class DefaultTuple implements Tuple {
protected boolean isNull = false;
private static final long serialVersionUID = 2L;
protected List<Object> mFields;
-
+
/**
* Default constructor. This constructor is public so that hadoop can call
* it directly. However, inside pig you should never be calling this
@@ -257,12 +259,7 @@ public class DefaultTuple implements Tup
}
public void write(DataOutput out) throws IOException {
- out.writeByte(DataType.TUPLE);
- int sz = size();
- out.writeInt(sz);
- for (int i = 0; i < sz; i++) {
- DataReaderWriter.writeDatum(out, mFields.get(i));
- }
+ DataReaderWriter.writeDatum(out, this);
}
public void readFields(DataInput in) throws IOException {
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultTupleFactory.java Mon Jul 12 22:09:09 2010
@@ -17,53 +17,21 @@
*/
package org.apache.pig.data;
-import java.lang.Class;
-import java.util.List;
-
-import org.apache.pig.backend.executionengine.ExecException;
-
/**
- * Default implementation of TupleFactory.
+ * This class was being used to create new tuple instances in some
+ * udfs and other places in pig code, even though the tuplefactory creation
+ * function (getInstance()) is static in TupleFactory
+ * (ie DefaultTuple could not override it)
+ * A typical call is - DefaultTupleFactory.getInstance().newTuple(..);
+ *
+ * So that such external udfs don't break, a DefaultTupleFactory is present.
+ * Don't use this in your code, use TupleFactory directly instead.
*/
-public class DefaultTupleFactory extends TupleFactory {
- public Tuple newTuple() {
- return new DefaultTuple();
-
- }
-
- public Tuple newTuple(int size) {
- return new DefaultTuple(size);
- }
-
- @SuppressWarnings("unchecked")
- public Tuple newTuple(List c) {
- return new DefaultTuple(c);
- }
-
- @SuppressWarnings("unchecked")
- public Tuple newTupleNoCopy(List list) {
- return new DefaultTuple(list, 1);
- }
-
- public Tuple newTuple(Object datum) {
- Tuple t = new DefaultTuple(1);
- try {
- t.set(0, datum);
- } catch (ExecException e) {
- // The world has come to an end, we just allocated a tuple with one slot
- // but we can't write to that slot.
- throw new RuntimeException("Unable to write to field 0 in newly " +
- "allocated tuple of size 1!", e);
- }
- return t;
- }
- public Class tupleClass() {
- return DefaultTuple.class;
- }
-
- DefaultTupleFactory() {
- }
+/**
+ * @deprecated Use {@link TupleFactory}
+ */
+@Deprecated
+public class DefaultTupleFactory extends BinSedesTupleFactory {
}
-
Added: hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedes.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * A class to handle reading and writing of intermediate results of data
+ * types. The serialization format used by this class more efficient than
+ * what was used in DataReaderWriter .
+ * The format used by the functions in this class is subject to change, so it
+ * should be used ONLY to store intermediate results within a pig query.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface InterSedes {
+
+ /**
+ * Get the next object from DataInput in
+ * @param in
+ * @return Next object from DataInput in
+ * @throws IOException
+ * @throws ExecException
+ */
+ public Object readDatum(DataInput in)
+ throws IOException, ExecException;
+
+ /**
+ * Get the next object from DataInput in of the type of type argument
+ * The type information has been read from DataInput.
+ * @param in
+ * @param type
+ * @return Next object from DataInput in
+ * @throws IOException
+ * @throws ExecException
+ */
+ public Object readDatum(DataInput in, byte type)
+ throws IOException, ExecException;
+
+ /**
+ * The type of next object has been determined to be of type Tuple,
+ * add the columns that belong to the tuple to given tuple argument t
+ * @param in
+ * @param t
+ * @throws IOException
+ */
+ public void addColsToTuple(DataInput in, Tuple t)
+ throws IOException;
+
+ /**
+ * Write given object val to DataOutput out
+ * @param out
+ * @param val
+ * @throws IOException
+ */
+ public void writeDatum(DataOutput out, Object val)
+ throws IOException;
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InterSedesFactory.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+
+/**
+ * Used to get hold of the single instance of InterSedes .
+ * In future, the subclass of InterSedes that gets instantiated would
+ * be configurable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InterSedesFactory {
+
+ private static InterSedes instance = null;
+ public static InterSedes getInterSedesInstance(){
+ if(instance == null){
+ instance = new BinInterSedes();
+ }
+ return instance;
+ }
+
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Jul 12 22:09:09 2010
@@ -74,7 +74,7 @@ public abstract class TupleFactory {
+ "tuple factory " + factoryName, e);
}
} else {
- gSelf = new DefaultTupleFactory();
+ gSelf = new BinSedesTupleFactory();
}
}
return gSelf;
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Mon Jul 12 22:09:09 2010
@@ -44,11 +44,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.experimental.logical.expression.BagDereferenceExpression;
import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
import org.apache.pig.experimental.logical.expression.ProjectExpression;
@@ -62,8 +60,8 @@ import org.apache.pig.experimental.plan.
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -889,7 +887,7 @@ public class LogToPhyTranslationVisitor
physOp.setAlias(loSplit.getAlias());
FileSpec splStrFile;
try {
- splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(BinStorage.class.getName()));
+ splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
} catch (IOException e1) {
byte errSrc = pc.getErrorSource();
int errCode = 0;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java?rev=963504&r1=963503&r2=963504&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/BinStorageRecordWriter.java Mon Jul 12 22:09:09 2010
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
/**
@@ -66,7 +68,7 @@ public class BinStorageRecordWriter exte
out.write(RECORD_1);
out.write(RECORD_2);
out.write(RECORD_3);
- t.write(out);
+ DataReaderWriter.writeDatum(out, t);
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BinInterSedes;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A record reader used to read data written using {@link InterRecordWriter}
+ * It uses the default InterSedes object for deserialization.
+ */
+public class InterRecordReader extends RecordReader<Text, Tuple> {
+
+ private long start;
+ private long pos;
+ private long end;
+ private BufferedPositionedInputStream in;
+ private Tuple value = null;
+ public static final int RECORD_1 = 0x01;
+ public static final int RECORD_2 = 0x02;
+ public static final int RECORD_3 = 0x03;
+ private DataInputStream inData = null;
+ private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem(job);
+ FSDataInputStream fileIn = fs.open(split.getPath());
+ if (start != 0) {
+ fileIn.seek(start);
+ }
+ in = new BufferedPositionedInputStream(fileIn, start);
+ inData = new DataInputStream(in);
+ }
+
+ public boolean nextKeyValue() throws IOException {
+ int b = 0;
+ // skip to next record
+ while (true) {
+ if (in == null || in.getPosition() >=end) {
+ return false;
+ }
+ // check if we saw RECORD_1 in our last attempt
+ // this can happen if we have the following
+ // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
+ // After reading the second RECORD_1 in the above
+ // sequence, we should not look for RECORD_1 again
+ if(b != RECORD_1) {
+ b = in.read();
+ if(b != RECORD_1 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ }
+ b = in.read();
+ if(b != RECORD_2 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ b = in.read();
+ if(b != RECORD_3 && b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ b = in.read();
+ if(b != BinInterSedes.TINYTUPLE &&
+ b != BinInterSedes.SMALLTUPLE &&
+ b != BinInterSedes.TUPLE &&
+ b != -1) {
+ continue;
+ }
+ if(b == -1) return false;
+ break;
+ }
+ try {
+ // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+ // sequence - lets now read the contents of the tuple
+ value = (Tuple)sedes.readDatum(inData, (byte)b);
+ return true;
+ } catch (ExecException ee) {
+ throw ee;
+ }
+
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ // the key is always null since we don't really have a key for each
+ // input record
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (pos - start) / (float)(end - start));
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java?rev=963504&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java Mon Jul 12 22:09:09 2010
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.InterSedes;
+import org.apache.pig.data.InterSedesFactory;
+import org.apache.pig.data.Tuple;
+
+
+/**
+ * A record reader used to write data compatible with {@link InterRecordWriter}
+ * It uses the default InterSedes object for serialization.
+ */
+public class InterRecordWriter extends
+ RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
+
+ public static final int RECORD_1 = 0x01;
+ public static final int RECORD_2 = 0x02;
+ public static final int RECORD_3 = 0x03;
+ private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
+ /**
+ * the outputstream to write out on
+ */
+ private DataOutputStream out;
+
+ /**
+ *
+ */
+ public InterRecordWriter(DataOutputStream out) {
+ this.out = out;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public void close(TaskAttemptContext arg0) throws IOException,
+ InterruptedException {
+ out.close();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public void write(WritableComparable wc, Tuple t) throws IOException,
+ InterruptedException {
+ // we really only want to write the tuple (value) out here
+ out.write(RECORD_1);
+ out.write(RECORD_2);
+ out.write(RECORD_3);
+ sedes.writeDatum(out, t);
+
+ }
+
+}