You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/24 18:19:27 UTC

svn commit: r1589784 - in /pig/trunk: ./ src/org/apache/pig/scripting/ src/org/apache/pig/tools/pigstats/ src/org/apache/pig/tools/pigstats/mapreduce/ test/org/apache/pig/test/

Author: cheolsoo
Date: Thu Apr 24 16:19:26 2014
New Revision: 1589784

URL: http://svn.apache.org/r1589784
Log:
PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 24 16:19:26 2014
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
  
 INCOMPATIBLE CHANGES
 
+PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
+
 PIG-3485: Remove CastUtils.bytesToMap(byte[] b) method from LoadCaster interface (cheolsoo) 
 
 PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java Thu Apr 24 16:19:26 2014
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
@@ -44,7 +44,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.jobFailedNotification(scriptId, jobStats);
             }
-        }        
+        }
     }
 
     @Override
@@ -53,7 +53,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.jobFinishedNotification(scriptId, jobStats);
             }
-        }        
+        }
     }
 
     @Override
@@ -62,7 +62,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.jobStartedNotification(scriptId, assignedJobId);
             }
-        }        
+        }
     }
 
     @Override
@@ -71,7 +71,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.jobsSubmittedNotification(scriptId, numJobsSubmitted);
             }
-        }        
+        }
     }
 
     @Override
@@ -81,11 +81,11 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.launchCompletedNotification(scriptId, numJobsSucceeded);
             }
-        }        
+        }
     }
 
     @Override
-    public void initialPlanNotification(String scriptId, MROperPlan plan) {
+    public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
         synchronized (listeners) {
             for (PigProgressNotificationListener listener : listeners) {
                 try {
@@ -105,7 +105,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.launchStartedNotification(scriptId, numJobsToLaunch);
             }
-        }        
+        }
     }
 
     @Override
@@ -115,7 +115,7 @@ class SyncProgressNotificationAdaptor im
             for (PigProgressNotificationListener listener : listeners) {
                 listener.outputCompletedNotification(scriptId, outputStats);
             }
-        }        
+        }
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Thu Apr 24 16:19:26 2014
@@ -18,9 +18,9 @@
 
 package org.apache.pig.tools.pigstats;
 
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.plan.OperatorPlan;
 
 import org.apache.pig.PigRunner;
 
@@ -33,66 +33,66 @@ import org.apache.pig.PigRunner;
 public interface PigProgressNotificationListener extends java.util.EventListener {
 
     /**
-     * Invoked before any MR jobs are run with the plan that is to be executed.
+     * Invoked before any Hadoop jobs are run with the plan that is to be executed.
      *
      * @param scriptId the unique id of the script
-     * @param plan the MROperPlan that is to be executed
+     * @param plan the OperatorPlan that is to be executed
      */
-    public void initialPlanNotification(String scriptId, MROperPlan plan);
+    public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
 
     /**
-     * Invoked just before launching MR jobs spawned by the script.
+     * Invoked just before launching Hadoop jobs spawned by the script.
      * @param scriptId the unique id of the script
-     * @param numJobsToLaunch the total number of MR jobs spawned by the script
+     * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
      */
     public void launchStartedNotification(String scriptId, int numJobsToLaunch);
-    
+
     /**
-     * Invoked just before submitting a batch of MR jobs.
+     * Invoked just before submitting a batch of Hadoop jobs.
      * @param scriptId the unique id of the script
-     * @param numJobsSubmitted the number of MR jobs in the batch
+     * @param numJobsSubmitted the number of Hadoop jobs in the batch
      */
     public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
-    
+
     /**
-     * Invoked after a MR job is started.
+     * Invoked after a Hadoop job is started.
      * @param scriptId the unique id of the script 
-     * @param assignedJobId the MR job id
+     * @param assignedJobId the Hadoop job id
      */
     public void jobStartedNotification(String scriptId, String assignedJobId);
-    
+
     /**
-     * Invoked just after a MR job is completed successfully. 
+     * Invoked just after a Hadoop job is completed successfully. 
      * @param scriptId the unique id of the script 
-     * @param jobStats the {@link JobStats} object associated with the MR job
+     * @param jobStats the {@link JobStats} object associated with the Hadoop job
      */
     public void jobFinishedNotification(String scriptId, JobStats jobStats);
-    
+
     /**
-     * Invoked when a MR job fails.
+     * Invoked when a Hadoop job fails.
      * @param scriptId the unique id of the script 
-     * @param jobStats the {@link JobStats} object associated with the MR job
+     * @param jobStats the {@link JobStats} object associated with the Hadoop job
      */
     public void jobFailedNotification(String scriptId, JobStats jobStats);
-    
+
     /**
      * Invoked just after an output is successfully written.
      * @param scriptId the unique id of the script
      * @param outputStats the {@link OutputStats} object associated with the output
      */
     public void outputCompletedNotification(String scriptId, OutputStats outputStats);
-    
+
     /**
      * Invoked to update the execution progress. 
      * @param scriptId the unique id of the script
      * @param progress the percentage of the execution progress
      */
     public void progressUpdatedNotification(String scriptId, int progress);
-    
+
     /**
-     * Invoked just after all MR jobs spawned by the script are completed.
+     * Invoked just after all Hadoop jobs spawned by the script are completed.
      * @param scriptId the unique id of the script
-     * @param numJobsSucceeded the total number of MR jobs succeeded
+     * @param numJobsSucceeded the total number of Hadoop jobs succeeded
      */
     public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Apr 24 16:19:26 2014
@@ -22,8 +22,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -36,8 +36,32 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.OriginalLocation;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
@@ -58,6 +82,8 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
+import com.google.common.collect.Lists;
+
 /**
  * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
  * cluster. These settings are added to all MR jobs spawned by the script and in
@@ -71,18 +97,18 @@ public abstract class ScriptState {
      * Keys of Pig settings added to Jobs
      */
     protected enum PIG_PROPERTY {
-        SCRIPT_ID("pig.script.id"), 
-        SCRIPT("pig.script"), 
-        COMMAND_LINE("pig.command.line"), 
-        HADOOP_VERSION("pig.hadoop.version"), 
-        VERSION("pig.version"), 
-        INPUT_DIRS("pig.input.dirs"), 
-        MAP_OUTPUT_DIRS("pig.map.output.dirs"), 
-        REDUCE_OUTPUT_DIRS("pig.reduce.output.dirs"), 
-        JOB_PARENTS("pig.parent.jobid"), 
-        JOB_FEATURE("pig.job.feature"), 
-        SCRIPT_FEATURES("pig.script.features"), 
-        JOB_ALIAS("pig.alias"), 
+        SCRIPT_ID("pig.script.id"),
+        SCRIPT("pig.script"),
+        COMMAND_LINE("pig.command.line"),
+        HADOOP_VERSION("pig.hadoop.version"),
+        VERSION("pig.version"),
+        INPUT_DIRS("pig.input.dirs"),
+        MAP_OUTPUT_DIRS("pig.map.output.dirs"),
+        REDUCE_OUTPUT_DIRS("pig.reduce.output.dirs"),
+        JOB_PARENTS("pig.parent.jobid"),
+        JOB_FEATURE("pig.job.feature"),
+        SCRIPT_FEATURES("pig.script.features"),
+        JOB_ALIAS("pig.alias"),
         JOB_ALIAS_LOCATION("pig.alias.location");
 
         private String displayStr;
@@ -101,30 +127,30 @@ public abstract class ScriptState {
      * Features used in a Pig script
      */
     public static enum PIG_FEATURE {
-        UNKNOWN, 
-        MERGE_JOIN, 
-        MERGE_SPARSE_JOIN, 
-        REPLICATED_JOIN, 
-        SKEWED_JOIN, 
-        HASH_JOIN, 
-        COLLECTED_GROUP, 
-        MERGE_COGROUP, 
-        COGROUP, 
-        GROUP_BY, 
-        ORDER_BY, 
-        RANK, 
-        DISTINCT, 
-        STREAMING, 
-        SAMPLER, 
-        INDEXER, 
-        MULTI_QUERY, 
-        FILTER, 
-        MAP_ONLY, 
-        CROSS, 
-        LIMIT, 
-        UNION, 
-        COMBINER, 
-        NATIVE, 
+        UNKNOWN,
+        MERGE_JOIN,
+        MERGE_SPARSE_JOIN,
+        REPLICATED_JOIN,
+        SKEWED_JOIN,
+        HASH_JOIN,
+        COLLECTED_GROUP,
+        MERGE_COGROUP,
+        COGROUP,
+        GROUP_BY,
+        ORDER_BY,
+        RANK,
+        DISTINCT,
+        STREAMING,
+        SAMPLER,
+        INDEXER,
+        MULTI_QUERY,
+        FILTER,
+        MAP_ONLY,
+        CROSS,
+        LIMIT,
+        UNION,
+        COMBINER,
+        NATIVE,
         MAP_PARTIALAGG;
     };
 
@@ -150,7 +176,7 @@ public abstract class ScriptState {
 
     protected PigContext pigContext;
 
-    protected List<PigProgressNotificationListener> listeners = new ArrayList<PigProgressNotificationListener>();
+    protected List<PigProgressNotificationListener> listeners = Lists.newArrayList();
 
     protected ScriptState(String id) {
         this.id = id;
@@ -186,6 +212,66 @@ public abstract class ScriptState {
         return listeners;
     }
 
+    public void emitInitialPlanNotification(OperatorPlan<?> plan) {
+        for (PigProgressNotificationListener listener: listeners) {
+            try {
+                listener.initialPlanNotification(id, plan);
+            } catch (NoSuchMethodError e) {
+                LOG.warn("PigProgressNotificationListener implementation doesn't "
+                       + "implement initialPlanNotification(..) method: "
+                       + listener.getClass().getName(), e);
+            }
+        }
+    }
+
+    public void emitLaunchStartedNotification(int numJobsToLaunch) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.launchStartedNotification(id, numJobsToLaunch);
+        }
+    }
+
+    public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobsSubmittedNotification(id, numJobsSubmitted);
+        }
+    }
+
+    public void emitJobStartedNotification(String assignedJobId) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobStartedNotification(id, assignedJobId);
+        }
+    }
+
+    public void emitjobFinishedNotification(JobStats jobStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobFinishedNotification(id, jobStats);
+        }
+    }
+
+    public void emitJobFailedNotification(JobStats jobStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.jobFailedNotification(id, jobStats);
+        }
+    }
+
+    public void emitOutputCompletedNotification(OutputStats outputStats) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.outputCompletedNotification(id, outputStats);
+        }
+    }
+
+    public void emitProgressUpdatedNotification(int progress) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.progressUpdatedNotification(id, progress);
+        }
+    }
+
+    public void emitLaunchCompletedNotification(int numJobsSucceeded) {
+        for (PigProgressNotificationListener listener: listeners) {
+            listener.launchCompletedNotification(id, numJobsSucceeded);
+        }
+    }
+
     public void setScript(File file) {
         try {
             setScript(new BufferedReader(new FileReader(file)));
@@ -344,7 +430,7 @@ public abstract class ScriptState {
             if (op.getGroupType() == GROUPTYPE.COLLECTED) {
                 feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
             } else if (op.getGroupType() == GROUPTYPE.MERGE) {
-                feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());                
+                feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());
             } else if (op.getGroupType() == GROUPTYPE.REGULAR) {
                 if (op.getExpressionPlans().size() > 1) {
                     feature.set(PIG_FEATURE.COGROUP.ordinal());
@@ -422,6 +508,180 @@ public abstract class ScriptState {
         public void visit(LONative n) {
             feature.set(PIG_FEATURE.NATIVE.ordinal());
         }
+    }
 
+    protected static class FeatureVisitor extends PhyPlanVisitor {
+        private BitSet feature;
+
+        public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.feature = feature;
+        }
+
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
+        }
+
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
+                feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+            else
+                feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
+        }
+
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
+        }
+
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg)
+                throws VisitorException {
+            feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
+        }
+
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException {
+            feature.set(PIG_FEATURE.DISTINCT.ordinal());
+        }
+
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            feature.set(PIG_FEATURE.STREAMING.ordinal());
+        }
+
+        @Override
+        public void visitSplit(POSplit split) throws VisitorException {
+            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+        }
+
+        @Override
+        public void visitDemux(PODemux demux) throws VisitorException {
+            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+        }
+
+        @Override
+        public void visitPartialAgg(POPartialAgg partAgg){
+            feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
+        }
+    }
+
+    protected static class AliasVisitor extends PhyPlanVisitor {
+        private HashSet<String> aliasSet;
+        private List<String> alias;
+        private final List<String> aliasLocation;
+
+        public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.alias = alias;
+            this.aliasLocation = aliasLocation;
+            aliasSet = new HashSet<String>();
+            if (!alias.isEmpty()) {
+                for (String s : alias) aliasSet.add(s);
+            }
+        }
+
+        @Override
+        public void visitLoad(POLoad load) throws VisitorException {
+            setAlias(load);
+            super.visitLoad(load);
+        }
+
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            setAlias(join);
+            super.visitFRJoin(join);
+        }
+
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            setAlias(join);
+            super.visitMergeJoin(join);
+        }
+
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            setAlias(mergeCoGrp);
+            super.visitMergeCoGroup(mergeCoGrp);
+        }
+
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg)
+                throws VisitorException {
+            setAlias(mg);
+            super.visitCollectedGroup(mg);
+        }
+
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException {
+            setAlias(distinct);
+            super.visitDistinct(distinct);
+        }
+
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            setAlias(stream);
+            super.visitStream(stream);
+        }
+
+        @Override
+        public void visitFilter(POFilter fl) throws VisitorException {
+            setAlias(fl);
+            super.visitFilter(fl);
+        }
+
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
+            setAlias(lr);
+            super.visitLocalRearrange(lr);
+        }
+
+        @Override
+        public void visitPOForEach(POForEach nfe) throws VisitorException {
+            setAlias(nfe);
+            super.visitPOForEach(nfe);
+        }
+
+        @Override
+        public void visitUnion(POUnion un) throws VisitorException {
+            setAlias(un);
+            super.visitUnion(un);
+        }
+
+        @Override
+        public void visitSort(POSort sort) throws VisitorException {
+            setAlias(sort);
+            super.visitSort(sort);
+        }
+
+        @Override
+        public void visitLimit(POLimit lim) throws VisitorException {
+            setAlias(lim);
+            super.visitLimit(lim);
+        }
+
+        @Override
+        public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+            setAlias(sk);
+            super.visitSkewedJoin(sk);
+        }
+
+        private void setAlias(PhysicalOperator op) {
+            String s = op.getAlias();
+            if (s != null) {
+                if (!aliasSet.contains(s)) {
+                    alias.add(s);
+                    aliasSet.add(s);
+                }
+            }
+            List<OriginalLocation> originalLocations = op.getOriginalLocations();
+            for (OriginalLocation originalLocation : originalLocations) {
+                aliasLocation.add(originalLocation.toString());
+            }
+        }
     }
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java Thu Apr 24 16:19:26 2014
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -34,40 +33,17 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.OriginalLocation;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
-import org.apache.pig.newplan.logical.relational.LOJoin;
-import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.OutputStats;
 
+import com.google.common.collect.Maps;
 
 /**
  * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
@@ -77,106 +53,34 @@ import org.apache.pig.tools.pigstats.Out
  * jobs can derive them from the job xmls.
  */
 public class MRScriptState extends ScriptState {
-    
     private static final Log LOG = LogFactory.getLog(MRScriptState.class);
 
-
     private Map<MapReduceOper, String> featureMap = null;
-    private Map<MapReduceOper, String> aliasMap = new HashMap<MapReduceOper, String>();
-    private Map<MapReduceOper, String> aliasLocationMap = new HashMap<MapReduceOper, String>();
-    
+    private Map<MapReduceOper, String> aliasMap = Maps.newHashMap();
+    private Map<MapReduceOper, String> aliasLocationMap = Maps.newHashMap();
 
     public MRScriptState(String id) {
         super(id);
     }
-    
+
     public static MRScriptState get() {
         return (MRScriptState) ScriptState.get();
     }
-    
-    public void registerListener(PigProgressNotificationListener listener) {
-        listeners.add(listener);
-    }
-
-    public List<PigProgressNotificationListener> getAllListeners() {
-        return listeners;
-    }
-
-    public void emitInitialPlanNotification(MROperPlan plan) {
-        for (PigProgressNotificationListener listener: listeners) {
-            try {
-                listener.initialPlanNotification(id, plan);
-            } catch (NoSuchMethodError e) {
-                LOG.warn("PigProgressNotificationListener implementation doesn't "
-                       + "implement initialPlanNotification(..) method: "
-                       + listener.getClass().getName(), e);
-            }
-        }
-    }
-
-    public void emitLaunchStartedNotification(int numJobsToLaunch) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.launchStartedNotification(id, numJobsToLaunch);
-        }
-    }
-    
 
-    public void emitJobsSubmittedNotification(int numJobsSubmitted) {        
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.jobsSubmittedNotification(id, numJobsSubmitted);
-        }        
-    }
-    
-    public void emitJobStartedNotification(String assignedJobId) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.jobStartedNotification(id, assignedJobId);
-        }
-    }
-    
-    public void emitjobFinishedNotification(JobStats jobStats) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.jobFinishedNotification(id, jobStats);
-        }
-    }
-    
-    public void emitJobFailedNotification(JobStats jobStats) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.jobFailedNotification(id, jobStats);
-        }
-    }
-    
-    public void emitOutputCompletedNotification(OutputStats outputStats) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.outputCompletedNotification(id, outputStats);
-        }
-    }
-    
-    public void emitProgressUpdatedNotification(int progress) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.progressUpdatedNotification(id, progress);
-        }
-    }
-    
-    public void emitLaunchCompletedNotification(int numJobsSucceeded) {
-        for (PigProgressNotificationListener listener: listeners) {
-            listener.launchCompletedNotification(id, numJobsSucceeded);
-        }
-    }
-    
     public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
         LOG.info("Pig script settings are added to the job");
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
-        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());        
+        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
-        
+
         try {
             LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
             ArrayList<String> outputDirs = new ArrayList<String>();
-            for (POStore st: stores) {  
-                outputDirs.add(st.getSFile().getFileName()); 
-            }                 
+            for (POStore st: stores) {
+                outputDirs.add(st.getSFile().getFileName());
+            }
             conf.set(PIG_PROPERTY.MAP_OUTPUT_DIRS.toString(), LoadFunc.join(outputDirs, ","));
         } catch (VisitorException e) {
             LOG.warn("unable to get the map stores", e);
@@ -185,29 +89,28 @@ public class MRScriptState extends Scrip
             try {
                 LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
                 ArrayList<String> outputDirs = new ArrayList<String>();
-                for (POStore st: stores) {  
-                    outputDirs.add(st.getSFile().getFileName()); 
-                }                      
+                for (POStore st: stores) {
+                    outputDirs.add(st.getSFile().getFileName());
+                }
                 conf.set(PIG_PROPERTY.REDUCE_OUTPUT_DIRS.toString(), LoadFunc.join(outputDirs, ","));
             } catch (VisitorException e) {
                 LOG.warn("unable to get the reduce stores", e);
             }
-        }        
+        }
         try {
             List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
             ArrayList<String> inputDirs = new ArrayList<String>();
             if (lds != null && lds.size() > 0){
                 for (POLoad ld : lds) {
                     inputDirs.add(ld.getLFile().getFileName());
-                }               
-                conf.set(PIG_PROPERTY.INPUT_DIRS.toString(), LoadFunc.join(inputDirs, ","));       
+                }
+                conf.set(PIG_PROPERTY.INPUT_DIRS.toString(), LoadFunc.join(inputDirs, ","));
             }
         } catch (VisitorException e) {
             LOG.warn("unable to get the map loads", e);
         }
 
         setPigFeature(mro, conf);
-
         setJobParents(mro, conf);
 
         conf.set("mapreduce.workflow.id", "pig_" + id);
@@ -290,7 +193,7 @@ public class MRScriptState extends Scrip
             aliasLocationStr += " R: "+LoadFunc.join(aliasLocation, ",");
             if (!alias.isEmpty()) {
                 Collections.sort(alias);
-            }              
+            }
         } catch (VisitorException e) {
             LOG.warn("unable to get alias", e);
         }
@@ -305,7 +208,6 @@ public class MRScriptState extends Scrip
         return aliasLocationMap.get(mro);
     }
 
-
     public String getPigFeature(MapReduceOper mro) {
         if (featureMap == null) {
             featureMap = new HashMap<MapReduceOper, String>();
@@ -348,8 +250,8 @@ public class MRScriptState extends Scrip
             else{// if it is NATIVE MR , don't explore its plans
                 try {
                     new FeatureVisitor(mro.mapPlan, feature).visit();
-                    if (mro.reducePlan.isEmpty()) { 
-                        feature.set(PIG_FEATURE.MAP_ONLY.ordinal());                    
+                    if (mro.reducePlan.isEmpty()) {
+                        feature.set(PIG_FEATURE.MAP_ONLY.ordinal());
                     } else {
                         new FeatureVisitor(mro.reducePlan, feature).visit();
                     }
@@ -359,7 +261,7 @@ public class MRScriptState extends Scrip
             }
             StringBuilder sb = new StringBuilder();
             for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
-                if (sb.length() > 0) sb.append(",");             
+                if (sb.length() > 0) sb.append(",");
                 sb.append(PIG_FEATURE.values()[i].name());
             }
             retStr = sb.toString();
@@ -368,186 +270,4 @@ public class MRScriptState extends Scrip
         return retStr;
     }
 
-
-    private static class FeatureVisitor extends PhyPlanVisitor {
-        private BitSet feature;
-
-        public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-            this.feature = feature;
-        }
-
-        @Override
-        public void visitFRJoin(POFRJoin join) throws VisitorException {
-            feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
-        }
-
-        @Override
-        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-            if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
-                feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
-            else
-                feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
-        }
-
-        @Override
-        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
-                throws VisitorException {
-            feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
-        }
-
-        @Override
-        public void visitCollectedGroup(POCollectedGroup mg)
-                throws VisitorException {
-            feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
-        }
-
-        @Override
-        public void visitDistinct(PODistinct distinct) throws VisitorException {
-            feature.set(PIG_FEATURE.DISTINCT.ordinal());
-        }
-
-        @Override
-        public void visitStream(POStream stream) throws VisitorException {
-            feature.set(PIG_FEATURE.STREAMING.ordinal());
-        }
-
-        @Override
-        public void visitSplit(POSplit split) throws VisitorException {
-            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
-        }
-
-        @Override
-        public void visitDemux(PODemux demux) throws VisitorException {
-            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
-        }
-
-        @Override
-        public void visitPartialAgg(POPartialAgg partAgg){
-            feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
-        }
-
-    }
-
-    private static class AliasVisitor extends PhyPlanVisitor {
-
-        private HashSet<String> aliasSet;
-
-        private List<String> alias;
-
-        private final List<String> aliasLocation;
-
-        public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-            this.alias = alias;
-            this.aliasLocation = aliasLocation;
-            aliasSet = new HashSet<String>();
-            if (!alias.isEmpty()) {
-                for (String s : alias) aliasSet.add(s);
-            }
-        }
-
-        @Override
-        public void visitLoad(POLoad load) throws VisitorException {
-            setAlias(load);
-            super.visitLoad(load);
-        }
-
-        @Override
-        public void visitFRJoin(POFRJoin join) throws VisitorException {
-            setAlias(join);
-            super.visitFRJoin(join);
-        }
-
-        @Override
-        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-            setAlias(join);
-            super.visitMergeJoin(join);
-        }
-
-        @Override
-        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
-                throws VisitorException {
-            setAlias(mergeCoGrp);
-            super.visitMergeCoGroup(mergeCoGrp);
-        }
-
-        @Override
-        public void visitCollectedGroup(POCollectedGroup mg)
-                throws VisitorException {
-            setAlias(mg);
-            super.visitCollectedGroup(mg);
-        }
-
-        @Override
-        public void visitDistinct(PODistinct distinct) throws VisitorException {
-            setAlias(distinct);
-            super.visitDistinct(distinct);
-        }
-
-        @Override
-        public void visitStream(POStream stream) throws VisitorException {
-            setAlias(stream);
-            super.visitStream(stream);
-        }
-
-        @Override
-        public void visitFilter(POFilter fl) throws VisitorException {
-            setAlias(fl);
-            super.visitFilter(fl);
-        }
-
-        @Override
-        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
-            setAlias(lr);
-            super.visitLocalRearrange(lr);
-        }
-
-        @Override
-        public void visitPOForEach(POForEach nfe) throws VisitorException {
-            setAlias(nfe);
-            super.visitPOForEach(nfe);
-        }
-
-        @Override
-        public void visitUnion(POUnion un) throws VisitorException {
-            setAlias(un);
-            super.visitUnion(un);
-        }
-
-        @Override
-        public void visitSort(POSort sort) throws VisitorException {
-            setAlias(sort);
-            super.visitSort(sort);
-        }
-
-        @Override
-        public void visitLimit(POLimit lim) throws VisitorException {
-            setAlias(lim);
-            super.visitLimit(lim);
-        }
-
-        @Override
-        public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
-            setAlias(sk);
-            super.visitSkewedJoin(sk);
-        }
-
-        private void setAlias(PhysicalOperator op) {
-            String s = op.getAlias();
-            if (s != null) {
-                if (!aliasSet.contains(s)) {
-                    alias.add(s);
-                    aliasSet.add(s);
-                }
-            }
-            List<OriginalLocation> originalLocations = op.getOriginalLocations();
-            for (OriginalLocation originalLocation : originalLocations) {
-                aliasLocation.add(originalLocation.toString());
-            }
-        }
-    }
-
 }

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Thu Apr 24 16:19:26 2014
@@ -32,8 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
@@ -41,9 +39,9 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -618,7 +616,7 @@ public class TestPigRunner {
         Util.deleteFile(cluster, OUTPUT_FILE);
         PigContext ctx = stats.getPigContext();
 
-        Assert.assertNotNull(ctx);
+        assertNotNull(ctx);
 
         assertTrue(ctx.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
         assertTrue("default", ctx.getProperties().getProperty("mapred.job.queue.name")!=null && ctx.getProperties().getProperty("mapred.job.queue.name").equals("default")||
@@ -675,8 +673,8 @@ public class TestPigRunner {
            String[] args = { "-x", "local", "-c", PIG_FILE };
            PigStats stats = PigRunner.run(args, null);
        
-           Assert.assertTrue(stats.isSuccessful());
-           Assert.assertEquals( 0, stats.getReturnCode() );
+           assertTrue(stats.isSuccessful());
+           assertEquals( 0, stats.getReturnCode() );
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -965,7 +963,7 @@ public class TestPigRunner {
         private static final int JobFinished = 3;
 
         @Override
-        public void initialPlanNotification(String id, MROperPlan plan) {
+        public void initialPlanNotification(String id, OperatorPlan<?> plan) {
             System.out.println("id: " + id + " planNodes: " + plan.getKeys().size());
             assertNotNull(plan);
         }