You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/13 23:11:24 UTC

svn commit: r656011 [1/5] - in /incubator/pig/branches/types: ./ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/data/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/i...

Author: gates
Date: Tue May 13 14:11:21 2008
New Revision: 656011

URL: http://svn.apache.org/viewvc?rev=656011&view=rev
Log:
Shravan's jumbo patch that includes most of map reduce layer plus cogroup, split, union physical operators.


Added:
    incubator/pig/branches/types/lib/hadoop16.jar   (with props)
    incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapOnly.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/RunnableReporter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROperPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGlobalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSplit.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestUnion.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/jsTst3
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/jsTst4
    incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/
    incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst1.txt
    incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst2.txt
    incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/jsTst5.txt
    incubator/pig/branches/types/test/org/apache/pig/test/data/InputFiles/passwd
Removed:
    incubator/pig/branches/types/lib/hadoop14.jar
    incubator/pig/branches/types/lib/hadoop15.jar
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/ExecType.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ConstantExpression.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/BinaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Add.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Divide.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Mod.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Multiply.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/arithmeticOperators/Subtract.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/ComparisonOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/EqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/GreaterThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/LTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/LessThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/binaryExprOps/comparators/NotEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/Operator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationException.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdAsc.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdDesc.java
    incubator/pig/branches/types/test/org/apache/pig/test/OrdDescNumeric.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGrunt.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TypeGraphPrinter.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Tue May 13 14:11:21 2008
@@ -40,7 +40,7 @@
     <property name="dist.dir" value="${build.dir}/${final.name}" />
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
-    <property name="hadoop.jarfile" value="hadoop15.jar" />
+    <property name="hadoop.jarfile" value="hadoop16.jar" />
 
     <!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
     <property name="output.jarfile" value="${build.dir}/${final.name}.jar" />
@@ -68,7 +68,7 @@
     <!-- ====================================================== -->
     <!-- setup the classpath -->
     <path id="classpath">
-        <fileset file="${lib.dir}/hadoop15.jar" />
+        <fileset file="${lib.dir}/${hadoop.jarfile}" />
         <fileset file="${lib.dir}/javacc.jar" />
         <fileset file="${lib.dir}/jsch-0.1.33.jar" />
         <fileset file="${lib.dir}/junit-4.1.jar" />
@@ -144,12 +144,13 @@
                  **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
         		**/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
         		**/test/TestPODistinct.java, **/test/TestPOSort.java,
-        		**/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,
-                **/test/FakeFSInputStream.java, **/test/Util.java,
+        		**/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,**/test/TestUnion.java, **/test/TestMRCompiler.java,
+                **/test/FakeFSInputStream.java, **/test/Util.java, **/test/TestJobSubmission.java,
+        		**/test/TestLocalJobSubmission.java,
                 **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
                 **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
                 **/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
-                **/physicalLayer/Result.java,
+                **/physicalLayer/Result.java, **/mapReduceLayer/**/*.java, 
                 **/physicalLayer/POStatus.java, **/bzip2r/*.java,
                 **/validators/*.java , **/test/TestInputOutputFileValidator.java ,
 				**/test/TestTypeCheckingValidator.java, 
@@ -261,11 +262,16 @@
                 	<include name="**/TestStore.java" />
                 	<include name="**/TestPackage.java" />  
                 	<include name="**/TestLocalRearrange.java" /> 
-                	<include name="**/TestForEach.java" /> 
+                	<include name="**/TestForEach.java" />
+					<include name="**/TestUnion.java" /> 
+                	<include name="**/TestMRCompiler.java" /> 
+                	<include name="**/TestJobSubmission.java" /> 
                 	<include name="**/TestInputOutputFileValidator.java" /> 
                 	<include name="**/TestTypeCheckingValidator.java" /> 
                 	<include name="**/TestSchema.java" /> 
-                	<include name="**/TestLogicalPlanBuilder.java" /> 
+                	<include name="**/TestLogicalPlanBuilder.java" />
+                	<include name="**/TestLocalJobSubmission.java" />
+                	
                     <!--
                     <include name="**/*Test*.java" />
                     <exclude name="**/TestLargeFile.java" />

Added: incubator/pig/branches/types/lib/hadoop16.jar
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/hadoop16.jar?rev=656011&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/branches/types/lib/hadoop16.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/pig/branches/types/src/org/apache/pig/ExecType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ExecType.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ExecType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ExecType.java Tue May 13 14:11:21 2008
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig;
 
 /**

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue May 13 14:11:21 2008
@@ -154,11 +154,13 @@
         catch (IOException e) {
             throw new ExecException("Failed to create DataStorage", e);
         }
-            
-        log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
+        
+        String jobTrackerName = conf.get("mapred.job.tracker").toString();
+        log.info("Connecting to map-reduce job tracker at: " + jobTrackerName);
         
         try {
-            jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+            if(!jobTrackerName.equalsIgnoreCase("local"))
+                jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
                                                               JobSubmissionProtocol.versionID, 
                                                               JobTracker.getAddress(conf.getConfiguration()),
                                                               conf.getConfiguration());

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Tue May 13 14:11:21 2008
@@ -24,6 +24,13 @@
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.backend.executionengine.ExecException;
 
 /**
@@ -111,6 +118,201 @@
         }
     }
     
+    static BooleanWritable boolWrit = new BooleanWritable();
+    static BytesWritable bytesWrit = new BytesWritable();
+    static Text stringWrit = new Text();
+    static FloatWritable floatWrit = new FloatWritable();
+    static IntWritable intWrit = new IntWritable();
+    static LongWritable longWrit = new LongWritable();
+    static DataBag defDB = BagFactory.getInstance().newDefaultBag();
+    static Tuple defTup = TupleFactory.getInstance().newTuple();
+    
+    public static WritableComparable getWritableComparableTypes(Object o) throws ExecException{
+        WritableComparable wcKey = null;
+        Map<Byte, String> typeToName = genTypeToNameMap();
+        byte type = DataType.findType(o);
+        switch (type) {
+        case DataType.BAG:
+            wcKey = (DataBag) o;
+            break;
+        case DataType.BOOLEAN:
+            boolWrit.set((Boolean)o);
+            wcKey = boolWrit;
+            break;
+        case DataType.BYTEARRAY:
+            byte[] dbaBytes = ((DataByteArray) o).get();
+            bytesWrit.set(dbaBytes,0,dbaBytes.length);
+            wcKey = bytesWrit;
+            break;
+        case DataType.CHARARRAY:
+            stringWrit.set((String) o);
+            wcKey = stringWrit;
+            break;
+        // Currently Hadoop does not have a DoubleWritable
+        // case DataType.DOUBLE:
+        case DataType.FLOAT:
+            floatWrit.set((Float) o);
+            wcKey = floatWrit;
+            break;
+        case DataType.INTEGER:
+            intWrit.set((Integer) o);
+            wcKey = intWrit;
+            break;
+        case DataType.LONG:
+            longWrit.set((Long) o);
+            wcKey = longWrit;
+            break;
+        case DataType.TUPLE:
+            wcKey = (Tuple) o;
+            break;
+//        case DataType.MAP:
+            // Hmm, This is problematic
+            // Need a deep clone to convert a Map into
+            // MapWritable
+            // wcKey = new MapWritable();
+//            break;
+        default:
+            throw new ExecException("The type "
+                    + typeToName.get(type)
+                    + " cannot be collected as a Key type");
+        }
+        return wcKey;
+    }
+    
+    public static WritableComparable getWritableComparableTypes(byte type) throws ExecException{
+        WritableComparable wcKey = null;
+        Map<Byte, String> typeToName = genTypeToNameMap();
+         switch (type) {
+        case DataType.BAG:
+            wcKey = defDB;
+            break;
+        case DataType.BOOLEAN:
+            wcKey = boolWrit;
+            break;
+        case DataType.BYTEARRAY:
+            wcKey = bytesWrit;
+            break;
+        case DataType.CHARARRAY:
+            wcKey = stringWrit;
+            break;
+        // Currently Hadoop does not have a DoubleWritable
+        // case DataType.DOUBLE:
+        case DataType.FLOAT:
+            wcKey = floatWrit;
+            break;
+        case DataType.INTEGER:
+            wcKey = intWrit;
+            break;
+        case DataType.LONG:
+            wcKey = longWrit;
+            break;
+        case DataType.TUPLE:
+            wcKey = defTup;
+            break;
+//        case DataType.MAP:
+            // Hmm, This is problematic
+            // Need a deep clone to convert a Map into
+            // MapWritable
+            // wcKey = new MapWritable();
+//            break;
+        default:
+            throw new ExecException("The type "
+                    + typeToName.get(type)
+                    + " cannot be collected as a Key type");
+        }
+        return wcKey;
+    }
+    
+    /*public static WritableComparable getWritableComparableTypes(Object o) throws ExecException{
+        WritableComparable wcKey = null;
+        Map<Byte, String> typeToName = genTypeToNameMap();
+        byte type = DataType.findType(o);
+        switch (type) {
+        case DataType.BAG:
+            wcKey = (DataBag) o;
+            break;
+        case DataType.BOOLEAN:
+            wcKey = new BooleanWritable((Boolean) o);
+            break;
+        case DataType.BYTEARRAY:
+            wcKey = new BytesWritable(((DataByteArray) o)
+                    .get());
+            break;
+        case DataType.CHARARRAY:
+            wcKey = new Text((String) o);
+            break;
+        // Currently Hadoop does not have a DoubleWritable
+        // case DataType.DOUBLE:
+        case DataType.FLOAT:
+            wcKey = new FloatWritable((Float) o);
+            break;
+        case DataType.INTEGER:
+            wcKey = new IntWritable((Integer) o);
+            break;
+        case DataType.LONG:
+            wcKey = new LongWritable((Long) o);
+            break;
+        case DataType.TUPLE:
+            wcKey = (Tuple) o;
+            break;
+        case DataType.MAP:
+            // Hmm, This is problematic
+            // Need a deep clone to convert a Map into
+            // MapWritable
+            // wcKey = new MapWritable();
+            break;
+        default:
+            throw new ExecException("The type "
+                    + typeToName.get(type)
+                    + " cannot be collected as a Key type");
+        }
+        return wcKey;
+    }
+    
+    public static WritableComparable getWritableComparableTypes(byte type) throws ExecException{
+        WritableComparable wcKey = null;
+        Map<Byte, String> typeToName = genTypeToNameMap();
+         switch (type) {
+        case DataType.BAG:
+            wcKey = DefaultBagFactory.getInstance().newDefaultBag();
+            break;
+        case DataType.BOOLEAN:
+            wcKey = new BooleanWritable();
+            break;
+        case DataType.BYTEARRAY:
+            wcKey = new BytesWritable();
+            break;
+        case DataType.CHARARRAY:
+            wcKey = new Text();
+            break;
+        // Currently Hadoop does not have a DoubleWritable
+        // case DataType.DOUBLE:
+        case DataType.FLOAT:
+            wcKey = new FloatWritable();
+            break;
+        case DataType.INTEGER:
+            wcKey = new IntWritable();
+            break;
+        case DataType.LONG:
+            wcKey = new LongWritable();
+            break;
+        case DataType.TUPLE:
+            wcKey = TupleFactory.getInstance().newTuple();
+            break;
+        case DataType.MAP:
+            // Hmm, This is problematic
+            // Need a deep clone to convert a Map into
+            // MapWritable
+            // wcKey = new MapWritable();
+            break;
+        default:
+            throw new ExecException("The type "
+                    + typeToName.get(type)
+                    + " cannot be collected as a Key type");
+        }
+        return wcKey;
+    }*/
+    
     public static int numTypes(){
         byte[] types = genAllTypes();
         return types.length;
@@ -145,6 +347,24 @@
     public static String findTypeName(Object o) {
         return findTypeName(findType(o));
     }
+    
+    public static Object convertToPigType(WritableComparable key) {
+        if ((key instanceof DataBag) || (key instanceof Tuple))
+            return key;
+        if (key instanceof BooleanWritable)
+            return ((BooleanWritable) key).get();
+        if (key instanceof BytesWritable)
+            return new DataByteArray(((BytesWritable) key).get());
+        if (key instanceof Text)
+            return ((Text) key).toString();
+        if (key instanceof FloatWritable)
+            return ((FloatWritable) key).get();
+        if (key instanceof IntWritable)
+            return ((IntWritable) key).get();
+        if (key instanceof LongWritable)
+            return ((LongWritable) key).get();
+        return null;
+    }
 
     /**
      * Get the type name from the type byte code
@@ -572,5 +792,4 @@
             default :return true ;
         }
     }
-    
 }

Added: incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java Tue May 13 14:11:21 2008
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+/**
+ * A tuple composed with the operators to which
+ * it needs be attached
+ *
+ */
+public class TargetedTuple implements Tuple {
+    private Tuple t;
+    // The list of operators to which this tuple
+    // has to be attached as input.
+    public List<OperatorKey> targetOps = null;
+
+    public TargetedTuple() {
+    }
+
+    public TargetedTuple(Tuple t, List<OperatorKey> targetOps) {
+        this.t = t;
+        this.targetOps = targetOps;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(super.toString());
+        sb.append("[");
+        for (OperatorKey target : targetOps) {
+            sb.append(target.toString());
+            sb.append(",");
+        }
+        sb.replace(sb.length() - 1, sb.length(), "]");
+        return sb.toString();
+    }
+
+    public void write(DataOutput out) throws IOException {
+        t.write(out);
+        out.writeInt(targetOps.size());
+        for (OperatorKey target : targetOps) {
+            // Ideally I should be able to call target.write(out).
+            // Since it doesn't support it yet handling it here
+            out.writeInt(target.scope.length());
+            out.writeBytes(target.scope);
+            out.writeLong(target.id);
+        }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        t.readFields(in);
+        targetOps = new ArrayList<OperatorKey>();
+        int size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            OperatorKey target = new OperatorKey();
+            // Ideally I should be able to call target.read(in).
+            // Since it doesn't support it yet handling it here
+            int scopeSz = in.readInt();
+            byte[] buf = new byte[scopeSz];
+            in.readFully(buf);
+            target.scope = new String(buf);
+            target.id = in.readLong();
+            targetOps.add(target);
+        }
+    }
+
+    public Tuple toTuple() {
+        return t;
+    }
+
+    public List<OperatorKey> getTargetOps() {
+        return targetOps;
+    }
+
+    public void setTargetOps(List<OperatorKey> targetOps) {
+        this.targetOps = targetOps;
+    }
+
+    public void append(Object val) {
+        t.append(val);
+    }
+
+    public Object get(int fieldNum) throws ExecException {
+        return t.get(fieldNum);
+    }
+
+    public List<Object> getAll() {
+        return t.getAll();
+    }
+
+    public long getMemorySize() {
+        return t.getMemorySize();
+    }
+
+    public byte getType(int fieldNum) throws ExecException {
+        return t.getType(fieldNum);
+    }
+
+    public boolean isNull(int fieldNum) throws ExecException {
+        return t.isNull(fieldNum);
+    }
+
+    public void reference(Tuple t) {
+        this.t = t;
+    }
+
+    public void set(int fieldNum, Object val) throws ExecException {
+        t.set(fieldNum, val);
+    }
+
+    public int size() {
+        return t.size();
+    }
+
+    public String toDelimitedString(String delim) throws ExecException {
+        return t.toDelimitedString(delim);
+    }
+
+    public int compareTo(Object o) {
+        return t.compareTo(o);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue May 13 14:11:21 2008
@@ -194,18 +194,60 @@
         return new DataStorageInputStreamIterator(elements);
     }
     
+    private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
+        ElementDescriptor[] elements = null;
+        
+        if (elem.exists()) {
+            try {
+                if(! elem.getDataStorage().isContainer(elem.toString())) {
+                    return elem.open();
+                }
+            }
+            catch (DataStorageException e) {
+                throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
+            }
+            
+            ArrayList<ElementDescriptor> arrayList = 
+                new ArrayList<ElementDescriptor>();
+            Iterator<ElementDescriptor> allElements = 
+                ((ContainerDescriptor)elem).iterator();
+            
+            while (allElements.hasNext()) {
+                ElementDescriptor ed = allElements.next();
+                int li = ed.toString().lastIndexOf(File.separatorChar);
+                String fName = ed.toString().substring(li+1);
+                if(fName.charAt(0)=='.')
+                    continue;
+                arrayList.add(ed);
+            }
+            
+            elements = new ElementDescriptor[ arrayList.size() ];
+            arrayList.toArray(elements);
+        
+        } else {
+            // It might be a glob
+            if (!globMatchesFiles(elem, elem.getDataStorage())) {
+                throw new IOException(elem.toString() + " does not exist");
+            }
+        }
+        
+        return new DataStorageInputStreamIterator(elements);
+    }
+    
     static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         if (!fileSpec.startsWith(LOCAL_PREFIX)) {
             init(pigContext);
-            ElementDescriptor elem = pigContext.getDfs().
-            asElement(fullPath(fileSpec, pigContext));
+            ElementDescriptor elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
             return openDFSFile(elem);
         }
         else {
             fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
             //buffering because we only want buffered streams to be passed to load functions.
-            return new BufferedInputStream(new FileInputStream(fileSpec));
+            /*return new BufferedInputStream(new FileInputStream(fileSpec));*/
+            init(pigContext);
+            ElementDescriptor elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
+            return openLFSFile(elem);
         }
     }
     
@@ -387,4 +429,12 @@
         }
     }
 
+    public static Random getR() {
+        return r;
+    }
+
+    public static void setR(Random r) {
+        FileLocalizer.r = r;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java?rev=656011&r1=656010&r2=656011&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/OperatorKey.java Tue May 13 14:11:21 2008
@@ -22,7 +22,7 @@
 
 import java.io.Serializable;
 
-public class OperatorKey implements Serializable {
+public class OperatorKey implements Serializable, Comparable<OperatorKey> {
     private static final long serialVersionUID = 1L;
     
     public String scope;
@@ -66,4 +66,18 @@
     public long getId() {
         return id;
     }
+
+    public int compareTo(OperatorKey o) {
+        int scCmp = scope.compareTo(o.scope);
+        if(scCmp!=0)
+            return scCmp;
+        else{
+            if(id>o.id)
+                return 1;
+            else if(id==o.id)
+                return 0;
+            else
+                return -1;
+        }
+    }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Tue May 13 14:11:21 2008
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+/**
+ * This is compiler class that takes an MROperPlan and converts
+ * it into a JobControl object with the relevant dependency info
+ * maintained. The JobControl Object is made up of Jobs each of
+ * which has a JobConf. The MapReduceOper corresponds to a Job
+ * and the getJobCong method returns the JobConf that is configured
+ * as per the MapReduceOper
+ */
+public class JobControlCompiler{
+    MROperPlan plan;
+    Configuration conf;
+    PigContext pigContext;
+    
+    /**
+     * The map between MapReduceOpers and their corresponding Jobs
+     */
+    Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
+    
+    /**
+     * Top level compile method that issues a call to the recursive
+     * compile method.
+     * @param plan - The MROperPlan to be compiled
+     * @param grpName - The name given to the JobControl
+     * @param conf - The Configuration object having the various properties
+     * @param pigContext - PigContext passed on from the execution engine
+     * @return JobControl object
+     * @throws JobCreationException
+     */
+    public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+        this.plan = plan;
+        this.conf = conf;
+        this.pigContext = pigContext;
+        JobControl jobCtrl = new JobControl(grpName);
+        
+        List<MapReduceOper> leaevs = new ArrayList<MapReduceOper>();
+        leaevs = plan.getLeaves();
+        
+        for (MapReduceOper mro : leaevs) {
+            jobCtrl.addJob(compile(mro,jobCtrl));
+        }
+        return jobCtrl;
+    }
+    
+    /**
+     * The recursive compilation method that works by doing a depth first 
+     * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
+     * with the dependencies maintained in jobCtrl
+     * @param mro - Input MapReduceOper for which a Job needs to be compiled
+     * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
+     * @return Job corresponding to the input mro
+     * @throws JobCreationException
+     */
+    private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
+        List<MapReduceOper> pred = new ArrayList<MapReduceOper>();
+        pred = plan.getPredecessors(mro);
+        
+        JobConf currJC = null;
+        
+        try{
+            if(pred==null || pred.size()<=0){
+                //No dependencies! Create the JobConf
+                //Construct the Job object with it and return
+                Job ret = null;
+                if(seen.containsKey(mro.getOperatorKey()))
+                    ret = seen.get(mro.getOperatorKey());
+                else{
+                    currJC = getJobConf(mro, conf, pigContext);
+                    ret = new Job(currJC,null);
+                    seen.put(mro.getOperatorKey(), ret);
+                }
+                return ret;
+            }
+            
+            //Has dependencies. So compile all the inputs
+            List compiledInputs = new ArrayList(pred.size());
+            
+            for (MapReduceOper oper : pred) {
+                Job ret = null;
+                if(seen.containsKey(oper.getOperatorKey()))
+                    ret = seen.get(oper.getOperatorKey());
+                else{
+                    ret = compile(oper, jobCtrl);
+                    jobCtrl.addJob(ret);
+                    seen.put(oper.getOperatorKey(),ret);
+                }
+                compiledInputs.add(ret);
+            }
+            //Get JobConf for the current MapReduceOper
+            currJC = getJobConf(mro, conf, pigContext);
+            
+            //Create a new Job with the obtained JobConf
+            //and the compiled inputs as dependent jobs
+            return new Job(currJC,(ArrayList)compiledInputs);
+        }catch(Exception e){
+            JobCreationException jce = new JobCreationException(e);
+            throw jce;
+        }
+    }
+    
+    /**
+     * The method that creates the JobConf corresponding to a MapReduceOper
+     * Doesn't support Sort or Distinct jobs yet. The assumption is that
+     * every MapReduceOper will have a load and a store. The JobConf removes
+     * the load operator and serializes the input filespec so that PigInputFormat can
+     * take over the creation of splits. It also removes the store operator
+     * and serializes the output filespec so that PigOutputFormat can take over
+     * record writing. The remaining portion of the map plan and reduce plans are
+     * serialized and stored for the PigMapReduce or PigMapOnly objects to take over
+     * the actual running of the plans.
+     * The Mapper & Reducer classes and the required key value formats are set.
+     * Checks if this is a map only job and uses PigMapOnly class as the mapper
+     * and uses PigMapReduce otherwise.
+     * If it is a Map Reduce job, it is bound to have a package operator. Remove it from
+     * the reduce plan and serializes it so that the PigMapReduce class can use it to package
+     * the indexed tuples received by the reducer.
+     * @param mro - The MapReduceOper for which the JobConf is required
+     * @param conf - the Configuration object from which JobConf is built
+     * @param pigContext - The PigContext passed on from execution engine
+     * @return JobConf corresponding to mro
+     * @throws JobCreationException
+     */
+    private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
+        JobConf jobConf = new JobConf(conf);
+        ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+        ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
+        
+        //Set the User Name for this job. This will be
+        //used as the working directory
+        String user = System.getProperty("user.name");
+        jobConf.setUser(user != null ? user : "Pigster");
+        
+        //Process the POLoads
+        List<PhysicalOperator> lds = getRoots(mro.mapPlan);
+        if(lds!=null && lds.size()>0){
+            for (PhysicalOperator operator : lds) {
+                POLoad ld = (POLoad)operator;
+                //Store the inp filespecs
+                inp.add(ld.getLFile());
+                //Store the target operators for tuples read
+                //from this input
+                List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+                List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+                if(ldSucs!=null){
+                    for (PhysicalOperator operator2 : ldSucs) {
+                        ldSucKeys.add(operator2.getOperatorKey());
+                    }
+                }
+                inpTargets.add(ldSucKeys);
+                //Remove the POLoad from the plan
+                mro.mapPlan.remove(ld);
+            }
+        }
+        try{
+            //Create the jar of all functions reuired
+            File submitJarFile = File.createTempFile("Job", ".jar");
+            FileOutputStream fos = new FileOutputStream(submitJarFile);
+            JarManager.createJar(fos, mro.UDFs, pigContext);
+            
+            //Start setting the JobConf properties
+            jobConf.setJar(submitJarFile.getPath());
+            jobConf.set("pig.inputs", ObjectSerializer.serialize(inp));
+            jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
+            jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+    
+            jobConf.setInputFormat(PigInputFormat.class);
+            jobConf.setOutputFormat(PigOutputFormat.class);
+            
+            //Process POStore and remove it from the plan
+            POStore st = null;
+            if(mro.reducePlan.isEmpty()){
+                st = (POStore) mro.mapPlan.getLeaves().get(0);
+                mro.mapPlan.remove(st);
+            }
+            else{
+                st = (POStore) mro.reducePlan.getLeaves().get(0);
+                mro.reducePlan.remove(st);
+            }
+            //set out filespecs
+            String outputPath = st.getSFile().getFileName();
+            String outputFuncSpec = st.getSFile().getFuncSpec();
+            jobConf.setOutputPath(new Path(outputPath));
+            jobConf.set("pig.storeFunc", outputFuncSpec);
+            
+            if(mro.reducePlan.isEmpty()){
+                //MapOnly Job
+                jobConf.setMapperClass(PigMapOnly.Map.class);
+                jobConf.setNumReduceTasks(0);
+                jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+            }
+            else{
+                //Map Reduce Job
+                //Process the POPackage operator and remove it from the reduce plan
+                POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0);
+                mro.reducePlan.remove(pack);
+                jobConf.setMapperClass(PigMapReduce.Map.class);
+                jobConf.setReducerClass(PigMapReduce.Reduce.class);
+                jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+                jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+                jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+                jobConf.setOutputKeyClass(DataType.getWritableComparableTypes(pack.getKeyType()).getClass());
+                jobConf.setOutputValueClass(IndexedTuple.class);
+            }
+    
+            return jobConf;
+        }catch(Exception e){
+            JobCreationException jce = new JobCreationException(e);
+            throw jce;
+        }
+    }
+    
+    private List<PhysicalOperator> getRoots(PhysicalPlan<PhysicalOperator> php){
+        List<PhysicalOperator> ret = new ArrayList<PhysicalOperator>();
+        for (PhysicalOperator operator : php.getRoots()) {
+            ret.add(operator);
+        }
+        return ret;
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobCreationException.java Tue May 13 14:11:21 2008
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class JobCreationException extends FrontendException{
+
+    private static final long serialVersionUID = 1L;
+
+    public JobCreationException() {
+        super();
+    }
+
+    public JobCreationException(String arg0, Throwable arg1) {
+        super(arg0, arg1);
+    }
+
+    public JobCreationException(String arg0) {
+        super(arg0);
+    }
+
+    public JobCreationException(Throwable arg0) {
+        super(arg0);
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Tue May 13 14:11:21 2008
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+
+/**
+ * The compiler that compiles a given physical plan
+ * into a DAG of MapReduce operators which can then 
+ * be converted into the JobControl structure.
+ * 
+ * Is implemented as a visitor of the PhysicalPlan it
+ * is compiling.
+ * 
+ * Currently supports all operators except the MR Sort
+ * operator 
+ * 
+ * Uses a predecessor based depth first traversal. 
+ * To compile an operator, first compiles
+ * the predecessors into MapReduce Operators and tries to
+ * merge the current operator into one of them. The goal
+ * being to keep the number of MROpers to a minimum.
+ * 
+ * It also merges multiple Map jobs, created by compiling
+ * the inputs individually, into a single job.
+ * 
+ * Only in case of blocking operators and splits, a new 
+ * MapReduce operator is started using a store-load combination
+ * to connect the two operators. Whenever this happens
+ * care is taken to add the MROper into the MRPlan and connect it
+ * appropriately.
+ *
+ */
+public class MRCompiler extends PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> {
+    
+    private Log log = LogFactory.getLog(getClass());
+    
+    PigContext pigContext;
+    
+    //The plan that is being compiled
+    PhysicalPlan<PhysicalOperator> plan;
+
+    //The plan of MapReduce Operators
+    MROperPlan MRPlan;
+    
+    //The current MapReduce Operator
+    //that is being compiled
+    MapReduceOper curMROp;
+    
+    //The output of compiling the inputs
+    MapReduceOper[] compiledInputs = null;
+    
+    //The split operators seen till now. If not
+    //maintained they will haunt you.
+    //During the traversal a split is the only
+    //operator that can be revisited from a different
+    //path. So this map stores the split job. So 
+    //whenever we hit the split, we create a new MROper
+    //and connect the split job using load-store and also
+    //in the MRPlan
+    Map<OperatorKey, MapReduceOper> splitsSeen;
+    
+    NodeIdGenerator nig;
+
+    private String scope;
+    
+    private Random r;
+    
+    public MRCompiler(PhysicalPlan<PhysicalOperator> plan) {
+        this(plan,null);
+    }
+    
+    public MRCompiler(PhysicalPlan<PhysicalOperator> plan,
+            PigContext pigContext) {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(plan));
+        this.plan = plan;
+        this.pigContext = pigContext;
+        splitsSeen = new HashMap<OperatorKey, MapReduceOper>();
+        MRPlan = new MROperPlan();
+        nig = NodeIdGenerator.getGenerator();
+        scope = "MRCompiler";
+        r = new Random(1331);
+        FileLocalizer.setR(r);
+    }
+    
+    /**
+     * Used to get the compiled plan
+     * @return
+     */
+    public MROperPlan getMRPlan() {
+        return MRPlan;
+    }
+    
+    /**
+     * Used to get the plan that was compiled
+     * @return
+     */
+    public PhysicalPlan<PhysicalOperator> getPlan() {
+        return plan;
+    }
+    
+    /**
+     * The front-end method that the user calls to compile
+     * the plan. Assumes that all submitted plans have a Store
+     * operators as the leaf.
+     * @return
+     * @throws IOException
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    public MROperPlan compile() throws IOException, PlanException, VisitorException {
+        List<PhysicalOperator> leaves = plan.getLeaves();
+        /*for (PhysicalOperator operator : leaves) {
+            compile(operator);
+            if (!curMROp.isMapDone()) {
+                curMROp.setMapDone(true);
+            } else if (!curMROp.isReduceDone()) {
+                curMROp.setReduceDone(true);
+            }
+        }*/
+        POStore store = (POStore)leaves.get(0);
+        compile(store);
+        
+        return MRPlan;
+    }
+    
+    /**
+     * Compiles the plan below op into a MapReduce Operator
+     * and stores it in curMROp.
+     * @param op
+     * @throws IOException
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void compile(PhysicalOperator op) throws IOException,
+    PlanException, VisitorException {
+        //An artifact of the Visitor. Need to save
+        //this so that it is not overwritten.
+        MapReduceOper[] prevCompInp = compiledInputs;
+        
+        //Compile each predecessor into the MROper and 
+        //store them away so that we can use them for compiling
+        //op.
+        List<PhysicalOperator> predecessors = plan.getPredecessors(op);
+        if (predecessors != null && predecessors.size() > 0) {
+            Collections.sort(predecessors);
+            compiledInputs = new MapReduceOper[predecessors.size()];
+            int i = -1;
+            for (PhysicalOperator pred : predecessors) {
+                if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
+                    compiledInputs[++i] = startNew(((POSplit)pred).getSplitStore(), splitsSeen.get(pred.getOperatorKey()));
+                    continue;
+                }
+                compile(pred);
+                compiledInputs[++i] = curMROp;
+            }
+        } else {
+            //No predecessors. Mostly a load. But this is where
+            //we start. We create a new MROp and add its first
+            //operator op. Also this should be added to the MRPlan.
+            curMROp = getMROp();
+            curMROp.mapPlan.add(op);
+            MRPlan.add(curMROp);
+            return;
+        }
+        
+        //Now we have the inputs compiled. Do something
+        //with the input oper op.
+        op.visit(this);
+        compiledInputs = prevCompInp;
+    }
+    
+    private MapReduceOper getMROp(){
+        return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
+    }
+    
+    private POLoad getLoad(){
+        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        return ld;
+    }
+    
+    private POStore getStore(){
+        POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        st.setPc(pigContext);
+        return st;
+    }
+    
+    /**
+     * A map MROper is an MROper whose map plan is still open
+     * for taking more non-blocking operators.
+     * A reduce MROper is an MROper whose map plan is done but
+     * the reduce plan is open for taking more non-blocking opers.
+     * 
+     * Used for compiling non-blocking operators. The logic here
+     * is simple. If there is a single input, just push the operator
+     * into whichever phase is open. Otherwise, we merge the compiled
+     * inputs into a list of MROpers where the first oper is the merged
+     * oper consisting of all map MROpers and the rest are reduce MROpers
+     * as reduce plans can't be merged.
+     * Then we add the input oper op into the merged map MROper's map plan
+     * as a leaf and connect the reduce MROpers using store-load combinations
+     * to the input operator which is the leaf. Also care is taken to 
+     * connect the MROpers according to the dependencies.
+     * @param op
+     * @throws PlanException
+     * @throws IOException 
+     * @throws IOException
+     */
+    private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+        if (compiledInputs.length == 1) {
+            //For speed
+            MapReduceOper mro = compiledInputs[0];
+            if (!mro.isMapDone()) {
+                mro.mapPlan.addAsLeaf(op);
+            } else if (mro.isMapDone() && !mro.isReduceDone()) {
+                mro.reducePlan.addAsLeaf(op);
+            } else {
+                log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
+            }
+            curMROp = mro;
+        } else {
+            List<MapReduceOper> mergedPlans = merge(compiledInputs);
+            
+            //The first MROper is always the merged map MROper
+            MapReduceOper mro = mergedPlans.remove(0);
+            //Push the input operator into the merged map MROper
+            mro.mapPlan.addAsLeaf(op);
+            
+            //Connect all the reduce MROpers
+            if(mergedPlans.size()>0)
+                connRedOper(mergedPlans, mro);
+            
+            //return the compiled MROper
+            curMROp = mro;
+        }
+    }
+    
+    /**
+     * Used for compiling blocking operators. If there is a single input
+     * and its map phase is still open, then close it so that further
+     * operators can be compiled into the reduce phase. If its reduce phase
+     * is open, add a store and close it. Start a new map MROper into which
+     * further operators can be compiled into. 
+     * 
+     * If there are multiple inputs, the logic 
+     * is to merge all map MROpers into one map MROper and retain
+     * the reduce MROpers. Since the operator is blocking, it has
+     * to be a Global Rerrange at least now. This operator need not
+     * be inserted into our plan as it is implemented by hadoop.
+     * But this creates the map-reduce boundary. So the merged map MROper
+     * is closed and its reduce phase is started. Depending on the number
+     * of reduce MROpers and the number of pipelines in the map MRoper
+     * a Union operator is inserted whenever necessary. This also leads to the 
+     * possibility of empty map plans. So have to be careful while handling
+     * it in the PigMapReduce class. If there are no map
+     * plans, then a new one is created as a side effect of the merge
+     * process. If there are no reduce MROpers, and only a single pipeline
+     * in the map, then no union oper is added. Otherwise a Union oper is 
+     * added to the merged map MROper to which all the reduce MROpers 
+     * are connected by store-load combinations. Care is taken
+     * to connect the MROpers in the MRPlan.  
+     * @param op
+     * @throws IOException
+     * @throws PlanException
+     */
+    private void blocking(PhysicalOperator op) throws IOException, PlanException{
+        if(compiledInputs.length==1){
+            MapReduceOper mro = compiledInputs[0];
+            if (!mro.isMapDone()) {
+                mro.setMapDoneSingle(true);
+                curMROp = mro;
+            }
+            else if(mro.isMapDone() && !mro.isReduceDone()){
+                FileSpec fSpec = getTempFileSpec();
+                
+                POStore st = getStore();
+                st.setSFile(fSpec);
+                mro.reducePlan.addAsLeaf(st);
+                mro.setReduceDone(true);
+                curMROp = startNew(fSpec, mro);
+                curMROp.setMapDone(true);
+            }
+        }
+        else{
+            List<MapReduceOper> mergedPlans = merge(compiledInputs);
+            MapReduceOper mro = mergedPlans.remove(0);
+            
+            if(mergedPlans.size()>0)
+                mro.setMapDoneMultiple(true);
+            else
+                mro.setMapDoneSingle(true);
+
+            // Connect all the reduce MROpers
+            if(mergedPlans.size()>0)
+                connRedOper(mergedPlans, mro);
+            curMROp = mro;
+        }
+    }
+    
+    /**
+     * Connect the reduce MROpers to the leaf node in the map MROper mro
+     * by adding appropriate loads
+     * @param mergedPlans - The list of reduce MROpers
+     * @param mro - The map MROper
+     * @throws IOException 
+     * @throws IOException
+     */
+    private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
+        PhysicalOperator leaf = null;
+        List<PhysicalOperator> leaves = mro.mapPlan.getLeaves();
+        if(leaves!=null && leaves.size()>0)
+            leaf = leaves.get(0);
+
+        for (MapReduceOper mmro : mergedPlans) {
+            mmro.setReduceDone(true);
+            FileSpec fileSpec = getTempFileSpec();
+            POLoad ld = getLoad();
+            ld.setLFile(fileSpec);
+            POStore str = getStore();
+            str.setSFile(fileSpec);
+            mmro.reducePlan.addAsLeaf(str);
+            mro.mapPlan.add(ld);
+            if(leaf!=null)
+                mro.mapPlan.connect(ld, leaf);
+            MRPlan.connect(mmro, mro);
+        }
+    }
+    
+    /**
+     * Used to compile a split operator. The logic is to
+     * close the split job by replacing the split oper by
+     * a store and creating a new Map MRoper and return
+     * that as the current MROper to which other operators
+     * would be compiled into. The new MROper would be connected
+     * to the split job by load-store. Also add the split oper 
+     * to the splitsSeen map.
+     * @param op
+     * @throws IOException
+     * @throws PlanException 
+     */
+    private void split(POSplit op) throws PlanException{
+        MapReduceOper mro = compiledInputs[0];
+        FileSpec fSpec = op.getSplitStore();
+        POStore str = getStore();
+        str.setSFile(fSpec);
+        if (!mro.isMapDone()) {
+            mro.mapPlan.addAsLeaf(str);
+            mro.setMapDoneSingle(true);
+        } else if (mro.isMapDone() && !mro.isReduceDone()) {
+            mro.reducePlan.addAsLeaf(str);
+            mro.setReduceDone(true);
+        } else {
+            log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
+        }
+        splitsSeen.put(op.getOperatorKey(), mro);
+        curMROp = startNew(fSpec, mro);
+    }
+    
+    /**
+     * Starts a new MRoper and connects it to the old
+     * one by load-store. The assumption is that the 
+     * store is already inserted into the old MROper.
+     * @param fSpec
+     * @param old
+     * @return
+     * @throws IOException
+     * @throws PlanException 
+     */
+    private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
+        POLoad ld = getLoad();
+        ld.setLFile(fSpec);
+        MapReduceOper ret = getMROp();
+        ret.mapPlan.add(ld);
+        MRPlan.add(ret);
+        MRPlan.connect(old, ret);
+        return ret;
+    }
+    
+    /**
+     * Returns a temporary DFS Path
+     * @return
+     * @throws IOException
+     */
+    private FileSpec getTempFileSpec() throws IOException {
+        return new FileSpec(FileLocalizer.getTemporaryPath(null, pigContext).toString(),
+                BinStorage.class.getName());
+    }
+    
+    /**
+     * Merges the map MROpers in the compiledInputs into a single
+     * merged map MRoper and returns a List with the merged map MROper
+     * as the first oper and the rest being reduce MROpers.
+     * 
+     * Care is taken to remove the map MROpers that are merged from the
+     * MRPlan and their connections moved over to the merged map MROper.
+     * 
+     * Merge is implemented as a sequence of binary merges.
+     * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst 
+     *   
+     * @param compiledInputs
+     * @return
+     * @throws PlanException
+     * @throws IOException
+     */
+    private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
+            throws PlanException {
+        List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
+        
+        MapReduceOper mergedMap = getMROp();
+        ret.add(mergedMap);
+        MRPlan.add(mergedMap);
+        
+        Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
+        List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
+
+        List<PhysicalPlan<PhysicalOperator>> mpLst = new ArrayList<PhysicalPlan<PhysicalOperator>>();
+
+        for (MapReduceOper mro : compiledInputs) {
+            if (!mro.isMapDone()) {
+                remLst.add(mro);
+                mpLst.add(mro.mapPlan);
+                List<MapReduceOper> pmros = MRPlan.getPredecessors(mro);
+                if(pmros!=null){
+                    for(MapReduceOper pmro : pmros)
+                        toBeConnected.add(pmro);
+                }
+            } else if (mro.isMapDone() && !mro.isReduceDone()) {
+                ret.add(mro);
+            } else {
+                log.warn(
+                        "Both map and reduce phases have been done. This is unexpected for a merge!");
+                throw new PlanException(
+                        "Both map and reduce phases have been done. This is unexpected for a merge!");
+            }
+        }
+        merge(ret.get(0).mapPlan, mpLst);
+        Iterator<MapReduceOper> it = toBeConnected.iterator();
+        while(it.hasNext())
+            MRPlan.connect(it.next(), mergedMap);
+        for(MapReduceOper rmro : remLst)
+            MRPlan.remove(rmro);
+        return ret;
+    }
+    
+    /**
+     * The merge of a list of map plans
+     * @param <O>
+     * @param <E>
+     * @param finPlan - Final Plan into which the list of plans is merged
+     * @param plans - list of map plans to be merged
+     * @throws PlanException
+     */
+    private <O extends Operator, E extends OperatorPlan<O>> void merge(
+            E finPlan, List<E> plans) throws PlanException {
+        for (E e : plans) {
+            finPlan.merge(e);
+        }
+    }
+    
+    /**
+     * The visitOp methods that decide what to do with the current operator
+     */
+    
+    public void visitSplit(POSplit op) throws VisitorException{
+        try{
+            split(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitLoad(POLoad op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitStore(POStore op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitFilter(POFilter op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitForEach(POForEach op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
+        try{
+            blocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitPackage(POPackage op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+    
+    public void visitUnion(POUnion op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java?rev=656011&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java Tue May 13 14:11:21 2008
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.PlanException;
+//import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An operator model for a Map Reduce job. 
+ * Acts as a host to the plans that will
+ * execute in map, reduce and optionally combine
+ * phases. These will be embedded in the MROperPlan
+ * in order to capture the dependecies amongst jobs.
+ */
+public class MapReduceOper extends Operator<MROpPlanVisitor> {
+    //The physical plan that should be executed
+    //in the map phase
+    public PhysicalPlan<PhysicalOperator> mapPlan;
+    
+    //The physical plan that should be executed
+    //in the reduce phase
+    public PhysicalPlan<PhysicalOperator> reducePlan;
+    
+    //The physical plan that should be executed
+    //in the combine phase if one exists. Will be used
+    //by the optimizer.
+    public PhysicalPlan<PhysicalOperator> combinePlan;
+    
+    //Indicates that the map plan creation
+    //is complete
+    boolean mapDone = false;
+    
+    //Indicates that the reduce plan creation
+    //is complete
+    boolean reduceDone = false;
+    
+    //Indicates if this job is an order by job
+    boolean globalSort = false;
+    
+    public List<String> UDFs;
+    
+    NodeIdGenerator nig;
+
+    private String scope;
+
+    public MapReduceOper(OperatorKey k) {
+        super(k);
+        mapPlan = new PhysicalPlan<PhysicalOperator>();
+        combinePlan = new PhysicalPlan<PhysicalOperator>();
+        reducePlan = new PhysicalPlan<PhysicalOperator>();
+        UDFs = new ArrayList<String>();
+        nig = NodeIdGenerator.getGenerator();
+        scope = "MapReduceOper";
+    }
+
+    /*@Override
+    public String name() {
+        return "MapReduce - " + mKey.toString();
+    }*/
+    
+    private String shiftStringByTabs(String DFStr, String tab) {
+        StringBuilder sb = new StringBuilder();
+        String[] spl = DFStr.split("\n");
+        for (int i = 0; i < spl.length; i++) {
+            sb.append(tab);
+            sb.append(spl[i]);
+            sb.append("\n");
+        }
+        sb.delete(sb.length() - "\n".length(), sb.length());
+        return sb.toString();
+    }
+    
+    /**
+     * Uses the string representation of the 
+     * component plans to identify itself.
+     */
+    @Override
+    public String name() {
+        StringBuilder sb = new StringBuilder("MapReduce - " + mKey.toString()
+                + ":\n");
+        int index = sb.length();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        if(!mapPlan.isEmpty()){
+            mapPlan.explain(baos);
+            String mp = new String(baos.toByteArray());
+            sb.append(shiftStringByTabs(mp, "|   "));
+        }
+        else
+            sb.append("Map Plan Empty");
+        if (!reducePlan.isEmpty()){
+            baos.reset();
+            reducePlan.explain(baos);
+            String rp = new String(baos.toByteArray());
+            sb.insert(index, shiftStringByTabs(rp, "|   ") + "\n");
+        }
+        else
+            sb.insert(index, "Reduce Plan Empty" + "\n");
+        return sb.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public void visit(MROpPlanVisitor v) throws VisitorException {
+        v.visitMROp(this);
+    }
+    
+    public boolean isMapDone() {
+        return mapDone;
+    }
+    
+    public void setMapDone(boolean mapDone) throws IOException{
+        this.mapDone = mapDone;
+    }
+    
+    public void setMapDoneSingle(boolean mapDone) throws PlanException{
+        this.mapDone = mapDone;
+        if (mapDone && mapPlan.getLeaves().size()>1) {
+            mapPlan.addAsLeaf(getUnion());
+        }
+    }
+    
+    public void setMapDoneMultiple(boolean mapDone) throws PlanException{
+        this.mapDone = mapDone;
+        if (mapDone && mapPlan.getLeaves().size()>0) {
+            mapPlan.addAsLeaf(getUnion());
+        }
+    }
+    
+    private POUnion getUnion(){
+        return new POUnion(new OperatorKey(scope,nig.getNextNodeId(scope)));
+    }
+    
+    /**
+     * Utility method for the MRCompiler that
+     * creates a union operator and adds it as
+     * a leaf to the map plan. Used when there
+     * is more than one input in order to process
+     * them in a single map job  
+     * @param mapDone
+     * @throws IOException
+     *//*
+    public void setMapDoneAndMerge(boolean mapDone) throws IOException{
+        this.mapDone = mapDone;
+        if(mapDone){
+            mapPlan.addAsLeaf(GenPhyOp.topUnionOp());
+        }
+    }
+    
+    *//**
+     * Same as the above method but checks to see
+     * there is more than one input. It is for 
+     * optimization purposes as a single input
+     * pipeline really doesn't need the union operator
+     * to execute in a single job.
+     * @param mapDone
+     * @throws IOException
+     *//*
+    public void setMapDoneAndChkdMerge(boolean mapDone) throws IOException {
+        this.mapDone = mapDone;
+        if (mapDone && mapPlan.getLeaves().size()!=1) {
+            mapPlan.addAsLeaf(GenPhyOp.topUnionOp());
+        }
+    }*/
+
+    public boolean isReduceDone() {
+        return reduceDone;
+    }
+
+    public void setReduceDone(boolean reduceDone){
+        this.reduceDone = reduceDone;
+    }
+    
+    public boolean isGlobalSort() {
+        return globalSort;
+    }
+
+    public void setGlobalSort(boolean globalSort) {
+        this.globalSort = globalSort;
+    }
+}