You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/13 23:11:24 UTC
svn commit: r656011 [1/5] - in /incubator/pig/branches/types: ./ lib/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/data/ src/org/apache/pig/impl/io/
src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/i...
Author: gates
Date: Tue May 13 14:11:21 2008
New Revision: 656011
URL: http://svn.apache.org/viewvc?rev=656011&view=rev
Log:
Shravan's jumbo patch that includes most of map reduce layer plus cogroup, split, union physical operators.
Added:
incubator/pig/branches/types/lib/hadoop16.jar (with props)
incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapOnly.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/RunnableReporter.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROperPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGlobalRearrange.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSplit.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java
incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
incubator/pig/branches/types/test/org/apache/pig/test/TestUnion.java
incubator/pig/branches/types/test/org/apache/pig/test/data/
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/jsTst3
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/jsTst4
incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/
incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst1.txt
incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst2.txt
incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst5.txt
incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/passwd
Removed:
incubator/pig/branches/types/lib/hadoop14.jar
incubator/pig/branches/types/lib/hadoop15.jar
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/ExecType.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Add.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Divide.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Mod.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Multiply.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Subtract.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/EqualToExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GTOrEqualToExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/LTOrEqualToExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/LessThanExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationException.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdAsc.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdDesc.java
incubator/pig/branches/types/test/org/apache/pig/test/OrdDescNumeric.java
incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
incubator/pig/branches/types/test/org/apache/pig/test/TestGrunt.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
incubator/pig/branches/types/test/org/apache/pig/test/TypeGraphPrinter.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Tue May 13 14:11:21 2008
@@ -40,7 +40,7 @@
<property name="dist.dir" value="${build.dir}/${final.name}" />
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
- <property name="hadoop.jarfile" value="hadoop15.jar" />
+ <property name="hadoop.jarfile" value="hadoop16.jar" />
<!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
<property name="output.jarfile" value="${build.dir}/${final.name}.jar" />
@@ -68,7 +68,7 @@
<!-- ====================================================== -->
<!-- setup the classpath -->
<path id="classpath">
- <fileset file="${lib.dir}/hadoop15.jar" />
+ <fileset file="${lib.dir}/${hadoop.jarfile}" />
<fileset file="${lib.dir}/javacc.jar" />
<fileset file="${lib.dir}/jsch-0.1.33.jar" />
<fileset file="${lib.dir}/junit-4.1.jar" />
@@ -144,12 +144,13 @@
**/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
**/test/TestPODistinct.java, **/test/TestPOSort.java,
- **/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,
- **/test/FakeFSInputStream.java, **/test/Util.java,
+ **/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,**/test/TestUnion.java, **/test/TestMRCompiler.java,
+ **/test/FakeFSInputStream.java, **/test/Util.java, **/test/TestJobSubmission.java,
+ **/test/TestLocalJobSubmission.java,
**/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
**/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
- **/physicalLayer/Result.java,
+ **/physicalLayer/Result.java, **/mapReduceLayer/**/*.java,
**/physicalLayer/POStatus.java, **/bzip2r/*.java,
**/validators/*.java , **/test/TestInputOutputFileValidator.java ,
**/test/TestTypeCheckingValidator.java,
@@ -261,11 +262,16 @@
<include name="**/TestStore.java" />
<include name="**/TestPackage.java" />
<include name="**/TestLocalRearrange.java" />
- <include name="**/TestForEach.java" />
+ <include name="**/TestForEach.java" />
+ <include name="**/TestUnion.java" />
+ <include name="**/TestMRCompiler.java" />
+ <include name="**/TestJobSubmission.java" />
<include name="**/TestInputOutputFileValidator.java" />
<include name="**/TestTypeCheckingValidator.java" />
<include name="**/TestSchema.java" />
- <include name="**/TestLogicalPlanBuilder.java" />
+ <include name="**/TestLogicalPlanBuilder.java" />
+ <include name="**/TestLocalJobSubmission.java" />
+
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Added: incubator/pig/branches/types/lib/hadoop16.jar
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/hadoop16.jar?rev=656011&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/pig/branches/types/lib/hadoop16.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: incubator/pig/branches/types/src/org/apache/pig/ExecType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ExecType.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ExecType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ExecType.java Tue May 13 14:11:21 2008
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig;
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue May 13 14:11:21 2008
@@ -154,11 +154,13 @@
catch (IOException e) {
throw new ExecException("Failed to create DataStorage", e);
}
-
- log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+
+ String jobTrackerName = conf.get("mapred.job.tracker").toString();
+ log.info("Connecting to map-reduce job tracker at: " + jobTrackerName);
try {
- jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ if(!jobTrackerName.equalsIgnoreCase("local"))
+ jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID,
JobTracker.getAddress(conf.getConfiguration()),
conf.getConfiguration());
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Tue May 13 14:11:21 2008
@@ -24,6 +24,13 @@
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.backend.executionengine.ExecException;
/**
@@ -111,6 +118,201 @@
}
}
+ static BooleanWritable boolWrit = new BooleanWritable();
+ static BytesWritable bytesWrit = new BytesWritable();
+ static Text stringWrit = new Text();
+ static FloatWritable floatWrit = new FloatWritable();
+ static IntWritable intWrit = new IntWritable();
+ static LongWritable longWrit = new LongWritable();
+ static DataBag defDB = BagFactory.getInstance().newDefaultBag();
+ static Tuple defTup = TupleFactory.getInstance().newTuple();
+
+ public static WritableComparable getWritableComparableTypes(Object o) throws ExecException{
+ WritableComparable wcKey = null;
+ Map<Byte, String> typeToName = genTypeToNameMap();
+ byte type = DataType.findType(o);
+ switch (type) {
+ case DataType.BAG:
+ wcKey = (DataBag) o;
+ break;
+ case DataType.BOOLEAN:
+ boolWrit.set((Boolean)o);
+ wcKey = boolWrit;
+ break;
+ case DataType.BYTEARRAY:
+ byte[] dbaBytes = ((DataByteArray) o).get();
+ bytesWrit.set(dbaBytes,0,dbaBytes.length);
+ wcKey = bytesWrit;
+ break;
+ case DataType.CHARARRAY:
+ stringWrit.set((String) o);
+ wcKey = stringWrit;
+ break;
+ // Currently Hadoop does not have a DoubleWritable
+ // case DataType.DOUBLE:
+ case DataType.FLOAT:
+ floatWrit.set((Float) o);
+ wcKey = floatWrit;
+ break;
+ case DataType.INTEGER:
+ intWrit.set((Integer) o);
+ wcKey = intWrit;
+ break;
+ case DataType.LONG:
+ longWrit.set((Long) o);
+ wcKey = longWrit;
+ break;
+ case DataType.TUPLE:
+ wcKey = (Tuple) o;
+ break;
+// case DataType.MAP:
+ // Hmm, This is problematic
+ // Need a deep clone to convert a Map into
+ // MapWritable
+ // wcKey = new MapWritable();
+// break;
+ default:
+ throw new ExecException("The type "
+ + typeToName.get(type)
+ + " cannot be collected as a Key type");
+ }
+ return wcKey;
+ }
+
+ public static WritableComparable getWritableComparableTypes(byte type) throws ExecException{
+ WritableComparable wcKey = null;
+ Map<Byte, String> typeToName = genTypeToNameMap();
+ switch (type) {
+ case DataType.BAG:
+ wcKey = defDB;
+ break;
+ case DataType.BOOLEAN:
+ wcKey = boolWrit;
+ break;
+ case DataType.BYTEARRAY:
+ wcKey = bytesWrit;
+ break;
+ case DataType.CHARARRAY:
+ wcKey = stringWrit;
+ break;
+ // Currently Hadoop does not have a DoubleWritable
+ // case DataType.DOUBLE:
+ case DataType.FLOAT:
+ wcKey = floatWrit;
+ break;
+ case DataType.INTEGER:
+ wcKey = intWrit;
+ break;
+ case DataType.LONG:
+ wcKey = longWrit;
+ break;
+ case DataType.TUPLE:
+ wcKey = defTup;
+ break;
+// case DataType.MAP:
+ // Hmm, This is problematic
+ // Need a deep clone to convert a Map into
+ // MapWritable
+ // wcKey = new MapWritable();
+// break;
+ default:
+ throw new ExecException("The type "
+ + typeToName.get(type)
+ + " cannot be collected as a Key type");
+ }
+ return wcKey;
+ }
+
+ /*public static WritableComparable getWritableComparableTypes(Object o) throws ExecException{
+ WritableComparable wcKey = null;
+ Map<Byte, String> typeToName = genTypeToNameMap();
+ byte type = DataType.findType(o);
+ switch (type) {
+ case DataType.BAG:
+ wcKey = (DataBag) o;
+ break;
+ case DataType.BOOLEAN:
+ wcKey = new BooleanWritable((Boolean) o);
+ break;
+ case DataType.BYTEARRAY:
+ wcKey = new BytesWritable(((DataByteArray) o)
+ .get());
+ break;
+ case DataType.CHARARRAY:
+ wcKey = new Text((String) o);
+ break;
+ // Currently Hadoop does not have a DoubleWritable
+ // case DataType.DOUBLE:
+ case DataType.FLOAT:
+ wcKey = new FloatWritable((Float) o);
+ break;
+ case DataType.INTEGER:
+ wcKey = new IntWritable((Integer) o);
+ break;
+ case DataType.LONG:
+ wcKey = new LongWritable((Long) o);
+ break;
+ case DataType.TUPLE:
+ wcKey = (Tuple) o;
+ break;
+ case DataType.MAP:
+ // Hmm, This is problematic
+ // Need a deep clone to convert a Map into
+ // MapWritable
+ // wcKey = new MapWritable();
+ break;
+ default:
+ throw new ExecException("The type "
+ + typeToName.get(type)
+ + " cannot be collected as a Key type");
+ }
+ return wcKey;
+ }
+
+ public static WritableComparable getWritableComparableTypes(byte type) throws ExecException{
+ WritableComparable wcKey = null;
+ Map<Byte, String> typeToName = genTypeToNameMap();
+ switch (type) {
+ case DataType.BAG:
+ wcKey = DefaultBagFactory.getInstance().newDefaultBag();
+ break;
+ case DataType.BOOLEAN:
+ wcKey = new BooleanWritable();
+ break;
+ case DataType.BYTEARRAY:
+ wcKey = new BytesWritable();
+ break;
+ case DataType.CHARARRAY:
+ wcKey = new Text();
+ break;
+ // Currently Hadoop does not have a DoubleWritable
+ // case DataType.DOUBLE:
+ case DataType.FLOAT:
+ wcKey = new FloatWritable();
+ break;
+ case DataType.INTEGER:
+ wcKey = new IntWritable();
+ break;
+ case DataType.LONG:
+ wcKey = new LongWritable();
+ break;
+ case DataType.TUPLE:
+ wcKey = TupleFactory.getInstance().newTuple();
+ break;
+ case DataType.MAP:
+ // Hmm, This is problematic
+ // Need a deep clone to convert a Map into
+ // MapWritable
+ // wcKey = new MapWritable();
+ break;
+ default:
+ throw new ExecException("The type "
+ + typeToName.get(type)
+ + " cannot be collected as a Key type");
+ }
+ return wcKey;
+ }*/
+
public static int numTypes(){
byte[] types = genAllTypes();
return types.length;
@@ -145,6 +347,24 @@
public static String findTypeName(Object o) {
return findTypeName(findType(o));
}
+
+ public static Object convertToPigType(WritableComparable key) {
+ if ((key instanceof DataBag) || (key instanceof Tuple))
+ return key;
+ if (key instanceof BooleanWritable)
+ return ((BooleanWritable) key).get();
+ if (key instanceof BytesWritable)
+ return new DataByteArray(((BytesWritable) key).get());
+ if (key instanceof Text)
+ return ((Text) key).toString();
+ if (key instanceof FloatWritable)
+ return ((FloatWritable) key).get();
+ if (key instanceof IntWritable)
+ return ((IntWritable) key).get();
+ if (key instanceof LongWritable)
+ return ((LongWritable) key).get();
+ return null;
+ }
/**
* Get the type name from the type byte code
@@ -572,5 +792,4 @@
default :return true ;
}
}
-
}
Added: incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java Tue May 13 14:11:21 2008
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+/**
+ * A tuple composed with the operators to which
+ * it needs be attached
+ *
+ */
+public class TargetedTuple implements Tuple {
+ private Tuple t;
+ // The list of operators to which this tuple
+ // has to be attached as input.
+ public List<OperatorKey> targetOps = null;
+
+ public TargetedTuple() {
+ }
+
+ public TargetedTuple(Tuple t, List<OperatorKey> targetOps) {
+ this.t = t;
+ this.targetOps = targetOps;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.toString());
+ sb.append("[");
+ for (OperatorKey target : targetOps) {
+ sb.append(target.toString());
+ sb.append(",");
+ }
+ sb.replace(sb.length() - 1, sb.length(), "]");
+ return sb.toString();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ t.write(out);
+ out.writeInt(targetOps.size());
+ for (OperatorKey target : targetOps) {
+ // Ideally I should be able to call target.write(out).
+ // Since it doesn't support it yet handling it here
+ out.writeInt(target.scope.length());
+ out.writeBytes(target.scope);
+ out.writeLong(target.id);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ t.readFields(in);
+ targetOps = new ArrayList<OperatorKey>();
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ OperatorKey target = new OperatorKey();
+ // Ideally I should be able to call target.read(in).
+ // Since it doesn't support it yet handling it here
+ int scopeSz = in.readInt();
+ byte[] buf = new byte[scopeSz];
+ in.readFully(buf);
+ target.scope = new String(buf);
+ target.id = in.readLong();
+ targetOps.add(target);
+ }
+ }
+
+ public Tuple toTuple() {
+ return t;
+ }
+
+ public List<OperatorKey> getTargetOps() {
+ return targetOps;
+ }
+
+ public void setTargetOps(List<OperatorKey> targetOps) {
+ this.targetOps = targetOps;
+ }
+
+ public void append(Object val) {
+ t.append(val);
+ }
+
+ public Object get(int fieldNum) throws ExecException {
+ return t.get(fieldNum);
+ }
+
+ public List<Object> getAll() {
+ return t.getAll();
+ }
+
+ public long getMemorySize() {
+ return t.getMemorySize();
+ }
+
+ public byte getType(int fieldNum) throws ExecException {
+ return t.getType(fieldNum);
+ }
+
+ public boolean isNull(int fieldNum) throws ExecException {
+ return t.isNull(fieldNum);
+ }
+
+ public void reference(Tuple t) {
+ this.t = t;
+ }
+
+ public void set(int fieldNum, Object val) throws ExecException {
+ t.set(fieldNum, val);
+ }
+
+ public int size() {
+ return t.size();
+ }
+
+ public String toDelimitedString(String delim) throws ExecException {
+ return t.toDelimitedString(delim);
+ }
+
+ public int compareTo(Object o) {
+ return t.compareTo(o);
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue May 13 14:11:21 2008
@@ -194,18 +194,60 @@
return new DataStorageInputStreamIterator(elements);
}
+ private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
+ ElementDescriptor[] elements = null;
+
+ if (elem.exists()) {
+ try {
+ if(! elem.getDataStorage().isContainer(elem.toString())) {
+ return elem.open();
+ }
+ }
+ catch (DataStorageException e) {
+ throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
+ }
+
+ ArrayList<ElementDescriptor> arrayList =
+ new ArrayList<ElementDescriptor>();
+ Iterator<ElementDescriptor> allElements =
+ ((ContainerDescriptor)elem).iterator();
+
+ while (allElements.hasNext()) {
+ ElementDescriptor ed = allElements.next();
+ int li = ed.toString().lastIndexOf(File.separatorChar);
+ String fName = ed.toString().substring(li+1);
+ if(fName.charAt(0)=='.')
+ continue;
+ arrayList.add(ed);
+ }
+
+ elements = new ElementDescriptor[ arrayList.size() ];
+ arrayList.toArray(elements);
+
+ } else {
+ // It might be a glob
+ if (!globMatchesFiles(elem, elem.getDataStorage())) {
+ throw new IOException(elem.toString() + " does not exist");
+ }
+ }
+
+ return new DataStorageInputStreamIterator(elements);
+ }
+
static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
if (!fileSpec.startsWith(LOCAL_PREFIX)) {
init(pigContext);
- ElementDescriptor elem = pigContext.getDfs().
- asElement(fullPath(fileSpec, pigContext));
+ ElementDescriptor elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
return openDFSFile(elem);
}
else {
fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
//buffering because we only want buffered streams to be passed to load functions.
- return new BufferedInputStream(new FileInputStream(fileSpec));
+ /*return new BufferedInputStream(new FileInputStream(fileSpec));*/
+ init(pigContext);
+ ElementDescriptor elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
+ return openLFSFile(elem);
}
}
@@ -387,4 +429,12 @@
}
}
+ public static Random getR() {
+ return r;
+ }
+
+ public static void setR(Random r) {
+ FileLocalizer.r = r;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java Tue May 13 14:11:21 2008
@@ -22,7 +22,7 @@
import java.io.Serializable;
-public class OperatorKey implements Serializable {
+public class OperatorKey implements Serializable, Comparable<OperatorKey> {
private static final long serialVersionUID = 1L;
public String scope;
@@ -66,4 +66,18 @@
public long getId() {
return id;
}
+
+ public int compareTo(OperatorKey o) {
+ int scCmp = scope.compareTo(o.scope);
+ if(scCmp!=0)
+ return scCmp;
+ else{
+ if(id>o.id)
+ return 1;
+ else if(id==o.id)
+ return 0;
+ else
+ return -1;
+ }
+ }
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Tue May 13 14:11:21 2008
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * This is compiler class that takes an MROperPlan and converts
+ * it into a JobControl object with the relevant dependency info
+ * maintained. The JobControl Object is made up of Jobs each of
+ * which has a JobConf. The MapReduceOper corresponds to a Job
+ * and the getJobCong method returns the JobConf that is configured
+ * as per the MapReduceOper
+ */
+public class JobControlCompiler{
+ MROperPlan plan;
+ Configuration conf;
+ PigContext pigContext;
+
+ /**
+ * The map between MapReduceOpers and their corresponding Jobs
+ */
+ Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
+
+ /**
+ * Top level compile method that issues a call to the recursive
+ * compile method.
+ * @param plan - The MROperPlan to be compiled
+ * @param grpName - The name given to the JobControl
+ * @param conf - The Configuration object having the various properties
+ * @param pigContext - PigContext passed on from the execution engine
+ * @return JobControl object
+ * @throws JobCreationException
+ */
+ public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+ this.plan = plan;
+ this.conf = conf;
+ this.pigContext = pigContext;
+ JobControl jobCtrl = new JobControl(grpName);
+
+ List<MapReduceOper> leaevs = new ArrayList<MapReduceOper>();
+ leaevs = plan.getLeaves();
+
+ for (MapReduceOper mro : leaevs) {
+ jobCtrl.addJob(compile(mro,jobCtrl));
+ }
+ return jobCtrl;
+ }
+
+ /**
+ * The recursive compilation method that works by doing a depth first
+ * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
+ * with the dependencies maintained in jobCtrl
+ * @param mro - Input MapReduceOper for which a Job needs to be compiled
+ * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
+ * @return Job corresponding to the input mro
+ * @throws JobCreationException
+ */
+ private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
+ List<MapReduceOper> pred = new ArrayList<MapReduceOper>();
+ pred = plan.getPredecessors(mro);
+
+ JobConf currJC = null;
+
+ try{
+ if(pred==null || pred.size()<=0){
+ //No dependencies! Create the JobConf
+ //Construct the Job object with it and return
+ Job ret = null;
+ if(seen.containsKey(mro.getOperatorKey()))
+ ret = seen.get(mro.getOperatorKey());
+ else{
+ currJC = getJobConf(mro, conf, pigContext);
+ ret = new Job(currJC,null);
+ seen.put(mro.getOperatorKey(), ret);
+ }
+ return ret;
+ }
+
+ //Has dependencies. So compile all the inputs
+ List compiledInputs = new ArrayList(pred.size());
+
+ for (MapReduceOper oper : pred) {
+ Job ret = null;
+ if(seen.containsKey(oper.getOperatorKey()))
+ ret = seen.get(oper.getOperatorKey());
+ else{
+ ret = compile(oper, jobCtrl);
+ jobCtrl.addJob(ret);
+ seen.put(oper.getOperatorKey(),ret);
+ }
+ compiledInputs.add(ret);
+ }
+ //Get JobConf for the current MapReduceOper
+ currJC = getJobConf(mro, conf, pigContext);
+
+ //Create a new Job with the obtained JobConf
+ //and the compiled inputs as dependent jobs
+ return new Job(currJC,(ArrayList)compiledInputs);
+ }catch(Exception e){
+ JobCreationException jce = new JobCreationException(e);
+ throw jce;
+ }
+ }
+
+ /**
+ * The method that creates the JobConf corresponding to a MapReduceOper
+ * Doesn't support Sort or Distinct jobs yet. The assumption is that
+ * every MapReduceOper will have a load and a store. The JobConf removes
+ * the load operator and serializes the input filespec so that PigInputFormat can
+ * take over the creation of splits. It also removes the store operator
+ * and serializes the output filespec so that PigOutputFormat can take over
+ * record writing. The remaining portion of the map plan and reduce plans are
+ * serialized and stored for the PigMapReduce or PigMapOnly objects to take over
+ * the actual running of the plans.
+ * The Mapper & Reducer classes and the required key value formats are set.
+ * Checks if this is a map only job and uses PigMapOnly class as the mapper
+ * and uses PigMapReduce otherwise.
+ * If it is a Map Reduce job, it is bound to have a package operator. Remove it from
+ * the reduce plan and serializes it so that the PigMapReduce class can use it to package
+ * the indexed tuples received by the reducer.
+ * @param mro - The MapReduceOper for which the JobConf is required
+ * @param conf - the Configuration object from which JobConf is built
+ * @param pigContext - The PigContext passed on from execution engine
+ * @return JobConf corresponding to mro
+ * @throws JobCreationException
+ */
+ private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
+ JobConf jobConf = new JobConf(conf);
+ ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+ ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
+
+ //Set the User Name for this job. This will be
+ //used as the working directory
+ String user = System.getProperty("user.name");
+ jobConf.setUser(user != null ? user : "Pigster");
+
+ //Process the POLoads
+ List<PhysicalOperator> lds = getRoots(mro.mapPlan);
+ if(lds!=null && lds.size()>0){
+ for (PhysicalOperator operator : lds) {
+ POLoad ld = (POLoad)operator;
+ //Store the inp filespecs
+ inp.add(ld.getLFile());
+ //Store the target operators for tuples read
+ //from this input
+ List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+ List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+ if(ldSucs!=null){
+ for (PhysicalOperator operator2 : ldSucs) {
+ ldSucKeys.add(operator2.getOperatorKey());
+ }
+ }
+ inpTargets.add(ldSucKeys);
+ //Remove the POLoad from the plan
+ mro.mapPlan.remove(ld);
+ }
+ }
+ try{
+ //Create the jar of all functions reuired
+ File submitJarFile = File.createTempFile("Job", ".jar");
+ FileOutputStream fos = new FileOutputStream(submitJarFile);
+ JarManager.createJar(fos, mro.UDFs, pigContext);
+
+ //Start setting the JobConf properties
+ jobConf.setJar(submitJarFile.getPath());
+ jobConf.set("pig.inputs", ObjectSerializer.serialize(inp));
+ jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
+ jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+
+ jobConf.setInputFormat(PigInputFormat.class);
+ jobConf.setOutputFormat(PigOutputFormat.class);
+
+ //Process POStore and remove it from the plan
+ POStore st = null;
+ if(mro.reducePlan.isEmpty()){
+ st = (POStore) mro.mapPlan.getLeaves().get(0);
+ mro.mapPlan.remove(st);
+ }
+ else{
+ st = (POStore) mro.reducePlan.getLeaves().get(0);
+ mro.reducePlan.remove(st);
+ }
+ //set out filespecs
+ String outputPath = st.getSFile().getFileName();
+ String outputFuncSpec = st.getSFile().getFuncSpec();
+ jobConf.setOutputPath(new Path(outputPath));
+ jobConf.set("pig.storeFunc", outputFuncSpec);
+
+ if(mro.reducePlan.isEmpty()){
+ //MapOnly Job
+ jobConf.setMapperClass(PigMapOnly.Map.class);
+ jobConf.setNumReduceTasks(0);
+ jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ }
+ else{
+ //Map Reduce Job
+ //Process the POPackage operator and remove it from the reduce plan
+ POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0);
+ mro.reducePlan.remove(pack);
+ jobConf.setMapperClass(PigMapReduce.Map.class);
+ jobConf.setReducerClass(PigMapReduce.Reduce.class);
+ jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+ jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+ jobConf.setOutputKeyClass(DataType.getWritableComparableTypes(pack.getKeyType()).getClass());
+ jobConf.setOutputValueClass(IndexedTuple.class);
+ }
+
+ return jobConf;
+ }catch(Exception e){
+ JobCreationException jce = new JobCreationException(e);
+ throw jce;
+ }
+ }
+
+ private List<PhysicalOperator> getRoots(PhysicalPlan<PhysicalOperator> php){
+ List<PhysicalOperator> ret = new ArrayList<PhysicalOperator>();
+ for (PhysicalOperator operator : php.getRoots()) {
+ ret.add(operator);
+ }
+ return ret;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java Tue May 13 14:11:21 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class JobCreationException extends FrontendException{
+
+ private static final long serialVersionUID = 1L;
+
+ public JobCreationException() {
+ super();
+ }
+
+ public JobCreationException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public JobCreationException(String arg0) {
+ super(arg0);
+ }
+
+ public JobCreationException(Throwable arg0) {
+ super(arg0);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Tue May 13 14:11:21 2008
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ * The compiler that compiles a given physical plan
+ * into a DAG of MapReduce operators which can then
+ * be converted into the JobControl structure.
+ *
+ * Is implemented as a visitor of the PhysicalPlan it
+ * is compiling.
+ *
+ * Currently supports all operators except the MR Sort
+ * operator
+ *
+ * Uses a predecessor based depth first traversal.
+ * To compile an operator, first compiles
+ * the predecessors into MapReduce Operators and tries to
+ * merge the current operator into one of them. The goal
+ * being to keep the number of MROpers to a minimum.
+ *
+ * It also merges multiple Map jobs, created by compiling
+ * the inputs individually, into a single job.
+ *
+ * Only in case of blocking operators and splits, a new
+ * MapReduce operator is started using a store-load combination
+ * to connect the two operators. Whenever this happens
+ * care is taken to add the MROper into the MRPlan and connect it
+ * appropriately.
+ *
+ */
+public class MRCompiler extends PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ PigContext pigContext;
+
+ //The plan that is being compiled
+ PhysicalPlan<PhysicalOperator> plan;
+
+ //The plan of MapReduce Operators
+ MROperPlan MRPlan;
+
+ //The current MapReduce Operator
+ //that is being compiled
+ MapReduceOper curMROp;
+
+ //The output of compiling the inputs
+ MapReduceOper[] compiledInputs = null;
+
+ //The split operators seen till now. If not
+ //maintained they will haunt you.
+ //During the traversal a split is the only
+ //operator that can be revisited from a different
+ //path. So this map stores the split job. So
+ //whenever we hit the split, we create a new MROper
+ //and connect the split job using load-store and also
+ //in the MRPlan
+ Map<OperatorKey, MapReduceOper> splitsSeen;
+
+ NodeIdGenerator nig;
+
+ private String scope;
+
+ private Random r;
+
+ public MRCompiler(PhysicalPlan<PhysicalOperator> plan) {
+ this(plan,null);
+ }
+
+ public MRCompiler(PhysicalPlan<PhysicalOperator> plan,
+ PigContext pigContext) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(plan));
+ this.plan = plan;
+ this.pigContext = pigContext;
+ splitsSeen = new HashMap<OperatorKey, MapReduceOper>();
+ MRPlan = new MROperPlan();
+ nig = NodeIdGenerator.getGenerator();
+ scope = "MRCompiler";
+ r = new Random(1331);
+ FileLocalizer.setR(r);
+ }
+
+ /**
+ * Used to get the compiled plan
+ * @return
+ */
+ public MROperPlan getMRPlan() {
+ return MRPlan;
+ }
+
+ /**
+ * Used to get the plan that was compiled
+ * @return
+ */
+ public PhysicalPlan<PhysicalOperator> getPlan() {
+ return plan;
+ }
+
+ /**
+ * The front-end method that the user calls to compile
+ * the plan. Assumes that all submitted plans have a Store
+ * operators as the leaf.
+ * @return
+ * @throws IOException
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ public MROperPlan compile() throws IOException, PlanException, VisitorException {
+ List<PhysicalOperator> leaves = plan.getLeaves();
+ /*for (PhysicalOperator operator : leaves) {
+ compile(operator);
+ if (!curMROp.isMapDone()) {
+ curMROp.setMapDone(true);
+ } else if (!curMROp.isReduceDone()) {
+ curMROp.setReduceDone(true);
+ }
+ }*/
+ POStore store = (POStore)leaves.get(0);
+ compile(store);
+
+ return MRPlan;
+ }
+
+ /**
+ * Compiles the plan below op into a MapReduce Operator
+ * and stores it in curMROp.
+ * @param op
+ * @throws IOException
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ private void compile(PhysicalOperator op) throws IOException,
+ PlanException, VisitorException {
+ //An artifact of the Visitor. Need to save
+ //this so that it is not overwritten.
+ MapReduceOper[] prevCompInp = compiledInputs;
+
+ //Compile each predecessor into the MROper and
+ //store them away so that we can use them for compiling
+ //op.
+ List<PhysicalOperator> predecessors = plan.getPredecessors(op);
+ if (predecessors != null && predecessors.size() > 0) {
+ Collections.sort(predecessors);
+ compiledInputs = new MapReduceOper[predecessors.size()];
+ int i = -1;
+ for (PhysicalOperator pred : predecessors) {
+ if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
+ compiledInputs[++i] = startNew(((POSplit)pred).getSplitStore(), splitsSeen.get(pred.getOperatorKey()));
+ continue;
+ }
+ compile(pred);
+ compiledInputs[++i] = curMROp;
+ }
+ } else {
+ //No predecessors. Mostly a load. But this is where
+ //we start. We create a new MROp and add its first
+ //operator op. Also this should be added to the MRPlan.
+ curMROp = getMROp();
+ curMROp.mapPlan.add(op);
+ MRPlan.add(curMROp);
+ return;
+ }
+
+ //Now we have the inputs compiled. Do something
+ //with the input oper op.
+ op.visit(this);
+ compiledInputs = prevCompInp;
+ }
+
+ private MapReduceOper getMROp(){
+ return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ }
+
+ private POLoad getLoad(){
+ POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ ld.setPc(pigContext);
+ return ld;
+ }
+
+ private POStore getStore(){
+ POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ st.setPc(pigContext);
+ return st;
+ }
+
+ /**
+ * A map MROper is an MROper whose map plan is still open
+ * for taking more non-blocking operators.
+ * A reduce MROper is an MROper whose map plan is done but
+ * the reduce plan is open for taking more non-blocking opers.
+ *
+ * Used for compiling non-blocking operators. The logic here
+ * is simple. If there is a single input, just push the operator
+ * into whichever phase is open. Otherwise, we merge the compiled
+ * inputs into a list of MROpers where the first oper is the merged
+ * oper consisting of all map MROpers and the rest are reduce MROpers
+ * as reduce plans can't be merged.
+ * Then we add the input oper op into the merged map MROper's map plan
+ * as a leaf and connect the reduce MROpers using store-load combinations
+ * to the input operator which is the leaf. Also care is taken to
+ * connect the MROpers according to the dependencies.
+ * @param op
+ * @throws PlanException
+ * @throws IOException
+ * @throws IOException
+ */
+ private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+ if (compiledInputs.length == 1) {
+ //For speed
+ MapReduceOper mro = compiledInputs[0];
+ if (!mro.isMapDone()) {
+ mro.mapPlan.addAsLeaf(op);
+ } else if (mro.isMapDone() && !mro.isReduceDone()) {
+ mro.reducePlan.addAsLeaf(op);
+ } else {
+ log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
+ }
+ curMROp = mro;
+ } else {
+ List<MapReduceOper> mergedPlans = merge(compiledInputs);
+
+ //The first MROper is always the merged map MROper
+ MapReduceOper mro = mergedPlans.remove(0);
+ //Push the input operator into the merged map MROper
+ mro.mapPlan.addAsLeaf(op);
+
+ //Connect all the reduce MROpers
+ if(mergedPlans.size()>0)
+ connRedOper(mergedPlans, mro);
+
+ //return the compiled MROper
+ curMROp = mro;
+ }
+ }
+
+ /**
+ * Used for compiling blocking operators. If there is a single input
+ * and its map phase is still open, then close it so that further
+ * operators can be compiled into the reduce phase. If its reduce phase
+ * is open, add a store and close it. Start a new map MROper into which
+ * further operators can be compiled into.
+ *
+ * If there are multiple inputs, the logic
+ * is to merge all map MROpers into one map MROper and retain
+ * the reduce MROpers. Since the operator is blocking, it has
+ * to be a Global Rerrange at least now. This operator need not
+ * be inserted into our plan as it is implemented by hadoop.
+ * But this creates the map-reduce boundary. So the merged map MROper
+ * is closed and its reduce phase is started. Depending on the number
+ * of reduce MROpers and the number of pipelines in the map MRoper
+ * a Union operator is inserted whenever necessary. This also leads to the
+ * possibility of empty map plans. So have to be careful while handling
+ * it in the PigMapReduce class. If there are no map
+ * plans, then a new one is created as a side effect of the merge
+ * process. If there are no reduce MROpers, and only a single pipeline
+ * in the map, then no union oper is added. Otherwise a Union oper is
+ * added to the merged map MROper to which all the reduce MROpers
+ * are connected by store-load combinations. Care is taken
+ * to connect the MROpers in the MRPlan.
+ * @param op
+ * @throws IOException
+ * @throws PlanException
+ */
+ private void blocking(PhysicalOperator op) throws IOException, PlanException{
+ if(compiledInputs.length==1){
+ MapReduceOper mro = compiledInputs[0];
+ if (!mro.isMapDone()) {
+ mro.setMapDoneSingle(true);
+ curMROp = mro;
+ }
+ else if(mro.isMapDone() && !mro.isReduceDone()){
+ FileSpec fSpec = getTempFileSpec();
+
+ POStore st = getStore();
+ st.setSFile(fSpec);
+ mro.reducePlan.addAsLeaf(st);
+ mro.setReduceDone(true);
+ curMROp = startNew(fSpec, mro);
+ curMROp.setMapDone(true);
+ }
+ }
+ else{
+ List<MapReduceOper> mergedPlans = merge(compiledInputs);
+ MapReduceOper mro = mergedPlans.remove(0);
+
+ if(mergedPlans.size()>0)
+ mro.setMapDoneMultiple(true);
+ else
+ mro.setMapDoneSingle(true);
+
+ // Connect all the reduce MROpers
+ if(mergedPlans.size()>0)
+ connRedOper(mergedPlans, mro);
+ curMROp = mro;
+ }
+ }
+
+ /**
+ * Connect the reduce MROpers to the leaf node in the map MROper mro
+ * by adding appropriate loads
+ * @param mergedPlans - The list of reduce MROpers
+ * @param mro - The map MROper
+ * @throws IOException
+ * @throws IOException
+ */
+ private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
+ PhysicalOperator leaf = null;
+ List<PhysicalOperator> leaves = mro.mapPlan.getLeaves();
+ if(leaves!=null && leaves.size()>0)
+ leaf = leaves.get(0);
+
+ for (MapReduceOper mmro : mergedPlans) {
+ mmro.setReduceDone(true);
+ FileSpec fileSpec = getTempFileSpec();
+ POLoad ld = getLoad();
+ ld.setLFile(fileSpec);
+ POStore str = getStore();
+ str.setSFile(fileSpec);
+ mmro.reducePlan.addAsLeaf(str);
+ mro.mapPlan.add(ld);
+ if(leaf!=null)
+ mro.mapPlan.connect(ld, leaf);
+ MRPlan.connect(mmro, mro);
+ }
+ }
+
+ /**
+ * Used to compile a split operator. The logic is to
+ * close the split job by replacing the split oper by
+ * a store and creating a new Map MRoper and return
+ * that as the current MROper to which other operators
+ * would be compiled into. The new MROper would be connected
+ * to the split job by load-store. Also add the split oper
+ * to the splitsSeen map.
+ * @param op
+ * @throws IOException
+ * @throws PlanException
+ */
+ private void split(POSplit op) throws PlanException{
+ MapReduceOper mro = compiledInputs[0];
+ FileSpec fSpec = op.getSplitStore();
+ POStore str = getStore();
+ str.setSFile(fSpec);
+ if (!mro.isMapDone()) {
+ mro.mapPlan.addAsLeaf(str);
+ mro.setMapDoneSingle(true);
+ } else if (mro.isMapDone() && !mro.isReduceDone()) {
+ mro.reducePlan.addAsLeaf(str);
+ mro.setReduceDone(true);
+ } else {
+ log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
+ }
+ splitsSeen.put(op.getOperatorKey(), mro);
+ curMROp = startNew(fSpec, mro);
+ }
+
+ /**
+ * Starts a new MRoper and connects it to the old
+ * one by load-store. The assumption is that the
+ * store is already inserted into the old MROper.
+ * @param fSpec
+ * @param old
+ * @return
+ * @throws IOException
+ * @throws PlanException
+ */
+ private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
+ POLoad ld = getLoad();
+ ld.setLFile(fSpec);
+ MapReduceOper ret = getMROp();
+ ret.mapPlan.add(ld);
+ MRPlan.add(ret);
+ MRPlan.connect(old, ret);
+ return ret;
+ }
+
+ /**
+ * Returns a temporary DFS Path
+ * @return
+ * @throws IOException
+ */
+ private FileSpec getTempFileSpec() throws IOException {
+ return new FileSpec(FileLocalizer.getTemporaryPath(null, pigContext).toString(),
+ BinStorage.class.getName());
+ }
+
+ /**
+ * Merges the map MROpers in the compiledInputs into a single
+ * merged map MRoper and returns a List with the merged map MROper
+ * as the first oper and the rest being reduce MROpers.
+ *
+ * Care is taken to remove the map MROpers that are merged from the
+ * MRPlan and their connections moved over to the merged map MROper.
+ *
+ * Merge is implemented as a sequence of binary merges.
+ * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst
+ *
+ * @param compiledInputs
+ * @return
+ * @throws PlanException
+ * @throws IOException
+ */
+ private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
+ throws PlanException {
+ List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
+
+ MapReduceOper mergedMap = getMROp();
+ ret.add(mergedMap);
+ MRPlan.add(mergedMap);
+
+ Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
+ List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
+
+ List<PhysicalPlan<PhysicalOperator>> mpLst = new ArrayList<PhysicalPlan<PhysicalOperator>>();
+
+ for (MapReduceOper mro : compiledInputs) {
+ if (!mro.isMapDone()) {
+ remLst.add(mro);
+ mpLst.add(mro.mapPlan);
+ List<MapReduceOper> pmros = MRPlan.getPredecessors(mro);
+ if(pmros!=null){
+ for(MapReduceOper pmro : pmros)
+ toBeConnected.add(pmro);
+ }
+ } else if (mro.isMapDone() && !mro.isReduceDone()) {
+ ret.add(mro);
+ } else {
+ log.warn(
+ "Both map and reduce phases have been done. This is unexpected for a merge!");
+ throw new PlanException(
+ "Both map and reduce phases have been done. This is unexpected for a merge!");
+ }
+ }
+ merge(ret.get(0).mapPlan, mpLst);
+ Iterator<MapReduceOper> it = toBeConnected.iterator();
+ while(it.hasNext())
+ MRPlan.connect(it.next(), mergedMap);
+ for(MapReduceOper rmro : remLst)
+ MRPlan.remove(rmro);
+ return ret;
+ }
+
+ /**
+ * The merge of a list of map plans
+ * @param <O>
+ * @param <E>
+ * @param finPlan - Final Plan into which the list of plans is merged
+ * @param plans - list of map plans to be merged
+ * @throws PlanException
+ */
+ private <O extends Operator, E extends OperatorPlan<O>> void merge(
+ E finPlan, List<E> plans) throws PlanException {
+ for (E e : plans) {
+ finPlan.merge(e);
+ }
+ }
+
+ /**
+ * The visitOp methods that decide what to do with the current operator
+ */
+
+ public void visitSplit(POSplit op) throws VisitorException{
+ try{
+ split(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitLoad(POLoad op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitStore(POStore op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitFilter(POFilter op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitForEach(POForEach op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
+ try{
+ blocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitPackage(POPackage op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ public void visitUnion(POUnion op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java Tue May 13 14:11:21 2008
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.PlanException;
+//import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An operator model for a Map Reduce job.
+ * Acts as a host to the plans that will
+ * execute in map, reduce and optionally combine
+ * phases. These will be embedded in the MROperPlan
+ * in order to capture the dependecies amongst jobs.
+ */
+public class MapReduceOper extends Operator<MROpPlanVisitor> {
+ //The physical plan that should be executed
+ //in the map phase
+ public PhysicalPlan<PhysicalOperator> mapPlan;
+
+ //The physical plan that should be executed
+ //in the reduce phase
+ public PhysicalPlan<PhysicalOperator> reducePlan;
+
+ //The physical plan that should be executed
+ //in the combine phase if one exists. Will be used
+ //by the optimizer.
+ public PhysicalPlan<PhysicalOperator> combinePlan;
+
+ //Indicates that the map plan creation
+ //is complete
+ boolean mapDone = false;
+
+ //Indicates that the reduce plan creation
+ //is complete
+ boolean reduceDone = false;
+
+ //Indicates if this job is an order by job
+ boolean globalSort = false;
+
+ public List<String> UDFs;
+
+ NodeIdGenerator nig;
+
+ private String scope;
+
+ public MapReduceOper(OperatorKey k) {
+ super(k);
+ mapPlan = new PhysicalPlan<PhysicalOperator>();
+ combinePlan = new PhysicalPlan<PhysicalOperator>();
+ reducePlan = new PhysicalPlan<PhysicalOperator>();
+ UDFs = new ArrayList<String>();
+ nig = NodeIdGenerator.getGenerator();
+ scope = "MapReduceOper";
+ }
+
+ /*@Override
+ public String name() {
+ return "MapReduce - " + mKey.toString();
+ }*/
+
+ private String shiftStringByTabs(String DFStr, String tab) {
+ StringBuilder sb = new StringBuilder();
+ String[] spl = DFStr.split("\n");
+ for (int i = 0; i < spl.length; i++) {
+ sb.append(tab);
+ sb.append(spl[i]);
+ sb.append("\n");
+ }
+ sb.delete(sb.length() - "\n".length(), sb.length());
+ return sb.toString();
+ }
+
+ /**
+ * Uses the string representation of the
+ * component plans to identify itself.
+ */
+ @Override
+ public String name() {
+ StringBuilder sb = new StringBuilder("MapReduce - " + mKey.toString()
+ + ":\n");
+ int index = sb.length();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ if(!mapPlan.isEmpty()){
+ mapPlan.explain(baos);
+ String mp = new String(baos.toByteArray());
+ sb.append(shiftStringByTabs(mp, "| "));
+ }
+ else
+ sb.append("Map Plan Empty");
+ if (!reducePlan.isEmpty()){
+ baos.reset();
+ reducePlan.explain(baos);
+ String rp = new String(baos.toByteArray());
+ sb.insert(index, shiftStringByTabs(rp, "| ") + "\n");
+ }
+ else
+ sb.insert(index, "Reduce Plan Empty" + "\n");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public void visit(MROpPlanVisitor v) throws VisitorException {
+ v.visitMROp(this);
+ }
+
+ public boolean isMapDone() {
+ return mapDone;
+ }
+
+ public void setMapDone(boolean mapDone) throws IOException{
+ this.mapDone = mapDone;
+ }
+
+ public void setMapDoneSingle(boolean mapDone) throws PlanException{
+ this.mapDone = mapDone;
+ if (mapDone && mapPlan.getLeaves().size()>1) {
+ mapPlan.addAsLeaf(getUnion());
+ }
+ }
+
+ public void setMapDoneMultiple(boolean mapDone) throws PlanException{
+ this.mapDone = mapDone;
+ if (mapDone && mapPlan.getLeaves().size()>0) {
+ mapPlan.addAsLeaf(getUnion());
+ }
+ }
+
+ private POUnion getUnion(){
+ return new POUnion(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ }
+
+ /**
+ * Utility method for the MRCompiler that
+ * creates a union operator and adds it as
+ * a leaf to the map plan. Used when there
+ * is more than one input in order to process
+ * them in a single map job
+ * @param mapDone
+ * @throws IOException
+ *//*
+ public void setMapDoneAndMerge(boolean mapDone) throws IOException{
+ this.mapDone = mapDone;
+ if(mapDone){
+ mapPlan.addAsLeaf(GenPhyOp.topUnionOp());
+ }
+ }
+
+ *//**
+ * Same as the above method but checks to see
+ * there is more than one input. It is for
+ * optimization purposes as a single input
+ * pipeline really doesn't need the union operator
+ * to execute in a single job.
+ * @param mapDone
+ * @throws IOException
+ *//*
+ public void setMapDoneAndChkdMerge(boolean mapDone) throws IOException {
+ this.mapDone = mapDone;
+ if (mapDone && mapPlan.getLeaves().size()!=1) {
+ mapPlan.addAsLeaf(GenPhyOp.topUnionOp());
+ }
+ }*/
+
+ public boolean isReduceDone() {
+ return reduceDone;
+ }
+
+ public void setReduceDone(boolean reduceDone){
+ this.reduceDone = reduceDone;
+ }
+
+ public boolean isGlobalSort() {
+ return globalSort;
+ }
+
+ public void setGlobalSort(boolean globalSort) {
+ this.globalSort = globalSort;
+ }
+}