You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/24 18:19:27 UTC
svn commit: r1589784 - in /pig/trunk: ./ src/org/apache/pig/scripting/
src/org/apache/pig/tools/pigstats/
src/org/apache/pig/tools/pigstats/mapreduce/ test/org/apache/pig/test/
Author: cheolsoo
Date: Thu Apr 24 16:19:26 2014
New Revision: 1589784
URL: http://svn.apache.org/r1589784
Log:
PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 24 16:19:26 2014
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
+
PIG-3485: Remove CastUtils.bytesToMap(byte[] b) method from LoadCaster interface (cheolsoo)
PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java Thu Apr 24 16:19:26 2014
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
@@ -44,7 +44,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.jobFailedNotification(scriptId, jobStats);
}
- }
+ }
}
@Override
@@ -53,7 +53,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.jobFinishedNotification(scriptId, jobStats);
}
- }
+ }
}
@Override
@@ -62,7 +62,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.jobStartedNotification(scriptId, assignedJobId);
}
- }
+ }
}
@Override
@@ -71,7 +71,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.jobsSubmittedNotification(scriptId, numJobsSubmitted);
}
- }
+ }
}
@Override
@@ -81,11 +81,11 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.launchCompletedNotification(scriptId, numJobsSucceeded);
}
- }
+ }
}
@Override
- public void initialPlanNotification(String scriptId, MROperPlan plan) {
+ public void initialPlanNotification(String scriptId, OperatorPlan<?> plan) {
synchronized (listeners) {
for (PigProgressNotificationListener listener : listeners) {
try {
@@ -105,7 +105,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.launchStartedNotification(scriptId, numJobsToLaunch);
}
- }
+ }
}
@Override
@@ -115,7 +115,7 @@ class SyncProgressNotificationAdaptor im
for (PigProgressNotificationListener listener : listeners) {
listener.outputCompletedNotification(scriptId, outputStats);
}
- }
+ }
}
@Override
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Thu Apr 24 16:19:26 2014
@@ -18,9 +18,9 @@
package org.apache.pig.tools.pigstats;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.PigRunner;
@@ -33,66 +33,66 @@ import org.apache.pig.PigRunner;
public interface PigProgressNotificationListener extends java.util.EventListener {
/**
- * Invoked before any MR jobs are run with the plan that is to be executed.
+ * Invoked before any Hadoop jobs are run with the plan that is to be executed.
*
* @param scriptId the unique id of the script
- * @param plan the MROperPlan that is to be executed
+ * @param plan the OperatorPlan that is to be executed
*/
- public void initialPlanNotification(String scriptId, MROperPlan plan);
+ public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
/**
- * Invoked just before launching MR jobs spawned by the script.
+ * Invoked just before launching Hadoop jobs spawned by the script.
* @param scriptId the unique id of the script
- * @param numJobsToLaunch the total number of MR jobs spawned by the script
+ * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
*/
public void launchStartedNotification(String scriptId, int numJobsToLaunch);
-
+
/**
- * Invoked just before submitting a batch of MR jobs.
+ * Invoked just before submitting a batch of Hadoop jobs.
* @param scriptId the unique id of the script
- * @param numJobsSubmitted the number of MR jobs in the batch
+ * @param numJobsSubmitted the number of Hadoop jobs in the batch
*/
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
-
+
/**
- * Invoked after a MR job is started.
+ * Invoked after a Hadoop job is started.
* @param scriptId the unique id of the script
- * @param assignedJobId the MR job id
+ * @param assignedJobId the Hadoop job id
*/
public void jobStartedNotification(String scriptId, String assignedJobId);
-
+
/**
- * Invoked just after a MR job is completed successfully.
+ * Invoked just after a Hadoop job is completed successfully.
* @param scriptId the unique id of the script
- * @param jobStats the {@link JobStats} object associated with the MR job
+ * @param jobStats the {@link JobStats} object associated with the Hadoop job
*/
public void jobFinishedNotification(String scriptId, JobStats jobStats);
-
+
/**
- * Invoked when a MR job fails.
+ * Invoked when a Hadoop job fails.
* @param scriptId the unique id of the script
- * @param jobStats the {@link JobStats} object associated with the MR job
+ * @param jobStats the {@link JobStats} object associated with the Hadoop job
*/
public void jobFailedNotification(String scriptId, JobStats jobStats);
-
+
/**
* Invoked just after an output is successfully written.
* @param scriptId the unique id of the script
* @param outputStats the {@link OutputStats} object associated with the output
*/
public void outputCompletedNotification(String scriptId, OutputStats outputStats);
-
+
/**
* Invoked to update the execution progress.
* @param scriptId the unique id of the script
* @param progress the percentage of the execution progress
*/
public void progressUpdatedNotification(String scriptId, int progress);
-
+
/**
- * Invoked just after all MR jobs spawned by the script are completed.
+ * Invoked just after all Hadoop jobs spawned by the script are completed.
* @param scriptId the unique id of the script
- * @param numJobsSucceeded the total number of MR jobs succeeded
+ * @param numJobsSucceeded the total number of Hadoop jobs succeeded
*/
public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Apr 24 16:19:26 2014
@@ -22,8 +22,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.BitSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -36,8 +36,32 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.VersionInfo;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.OriginalLocation;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
@@ -58,6 +82,8 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import com.google.common.collect.Lists;
+
/**
* ScriptStates encapsulates settings for a Pig script that runs on a hadoop
* cluster. These settings are added to all MR jobs spawned by the script and in
@@ -71,18 +97,18 @@ public abstract class ScriptState {
* Keys of Pig settings added to Jobs
*/
protected enum PIG_PROPERTY {
- SCRIPT_ID("pig.script.id"),
- SCRIPT("pig.script"),
- COMMAND_LINE("pig.command.line"),
- HADOOP_VERSION("pig.hadoop.version"),
- VERSION("pig.version"),
- INPUT_DIRS("pig.input.dirs"),
- MAP_OUTPUT_DIRS("pig.map.output.dirs"),
- REDUCE_OUTPUT_DIRS("pig.reduce.output.dirs"),
- JOB_PARENTS("pig.parent.jobid"),
- JOB_FEATURE("pig.job.feature"),
- SCRIPT_FEATURES("pig.script.features"),
- JOB_ALIAS("pig.alias"),
+ SCRIPT_ID("pig.script.id"),
+ SCRIPT("pig.script"),
+ COMMAND_LINE("pig.command.line"),
+ HADOOP_VERSION("pig.hadoop.version"),
+ VERSION("pig.version"),
+ INPUT_DIRS("pig.input.dirs"),
+ MAP_OUTPUT_DIRS("pig.map.output.dirs"),
+ REDUCE_OUTPUT_DIRS("pig.reduce.output.dirs"),
+ JOB_PARENTS("pig.parent.jobid"),
+ JOB_FEATURE("pig.job.feature"),
+ SCRIPT_FEATURES("pig.script.features"),
+ JOB_ALIAS("pig.alias"),
JOB_ALIAS_LOCATION("pig.alias.location");
private String displayStr;
@@ -101,30 +127,30 @@ public abstract class ScriptState {
* Features used in a Pig script
*/
public static enum PIG_FEATURE {
- UNKNOWN,
- MERGE_JOIN,
- MERGE_SPARSE_JOIN,
- REPLICATED_JOIN,
- SKEWED_JOIN,
- HASH_JOIN,
- COLLECTED_GROUP,
- MERGE_COGROUP,
- COGROUP,
- GROUP_BY,
- ORDER_BY,
- RANK,
- DISTINCT,
- STREAMING,
- SAMPLER,
- INDEXER,
- MULTI_QUERY,
- FILTER,
- MAP_ONLY,
- CROSS,
- LIMIT,
- UNION,
- COMBINER,
- NATIVE,
+ UNKNOWN,
+ MERGE_JOIN,
+ MERGE_SPARSE_JOIN,
+ REPLICATED_JOIN,
+ SKEWED_JOIN,
+ HASH_JOIN,
+ COLLECTED_GROUP,
+ MERGE_COGROUP,
+ COGROUP,
+ GROUP_BY,
+ ORDER_BY,
+ RANK,
+ DISTINCT,
+ STREAMING,
+ SAMPLER,
+ INDEXER,
+ MULTI_QUERY,
+ FILTER,
+ MAP_ONLY,
+ CROSS,
+ LIMIT,
+ UNION,
+ COMBINER,
+ NATIVE,
MAP_PARTIALAGG;
};
@@ -150,7 +176,7 @@ public abstract class ScriptState {
protected PigContext pigContext;
- protected List<PigProgressNotificationListener> listeners = new ArrayList<PigProgressNotificationListener>();
+ protected List<PigProgressNotificationListener> listeners = Lists.newArrayList();
protected ScriptState(String id) {
this.id = id;
@@ -186,6 +212,66 @@ public abstract class ScriptState {
return listeners;
}
+ public void emitInitialPlanNotification(OperatorPlan<?> plan) {
+ for (PigProgressNotificationListener listener: listeners) {
+ try {
+ listener.initialPlanNotification(id, plan);
+ } catch (NoSuchMethodError e) {
+ LOG.warn("PigProgressNotificationListener implementation doesn't "
+ + "implement initialPlanNotification(..) method: "
+ + listener.getClass().getName(), e);
+ }
+ }
+ }
+
+ public void emitLaunchStartedNotification(int numJobsToLaunch) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.launchStartedNotification(id, numJobsToLaunch);
+ }
+ }
+
+ public void emitJobsSubmittedNotification(int numJobsSubmitted) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobsSubmittedNotification(id, numJobsSubmitted);
+ }
+ }
+
+ public void emitJobStartedNotification(String assignedJobId) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobStartedNotification(id, assignedJobId);
+ }
+ }
+
+ public void emitjobFinishedNotification(JobStats jobStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobFinishedNotification(id, jobStats);
+ }
+ }
+
+ public void emitJobFailedNotification(JobStats jobStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.jobFailedNotification(id, jobStats);
+ }
+ }
+
+ public void emitOutputCompletedNotification(OutputStats outputStats) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.outputCompletedNotification(id, outputStats);
+ }
+ }
+
+ public void emitProgressUpdatedNotification(int progress) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.progressUpdatedNotification(id, progress);
+ }
+ }
+
+ public void emitLaunchCompletedNotification(int numJobsSucceeded) {
+ for (PigProgressNotificationListener listener: listeners) {
+ listener.launchCompletedNotification(id, numJobsSucceeded);
+ }
+ }
+
public void setScript(File file) {
try {
setScript(new BufferedReader(new FileReader(file)));
@@ -344,7 +430,7 @@ public abstract class ScriptState {
if (op.getGroupType() == GROUPTYPE.COLLECTED) {
feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
} else if (op.getGroupType() == GROUPTYPE.MERGE) {
- feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());
+ feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());
} else if (op.getGroupType() == GROUPTYPE.REGULAR) {
if (op.getExpressionPlans().size() > 1) {
feature.set(PIG_FEATURE.COGROUP.ordinal());
@@ -422,6 +508,180 @@ public abstract class ScriptState {
public void visit(LONative n) {
feature.set(PIG_FEATURE.NATIVE.ordinal());
}
+ }
+ protected static class FeatureVisitor extends PhyPlanVisitor {
+ private BitSet feature;
+
+ public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.feature = feature;
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
+ feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+ else
+ feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
+ }
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+ feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg)
+ throws VisitorException {
+ feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
+ }
+
+ @Override
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
+ feature.set(PIG_FEATURE.DISTINCT.ordinal());
+ }
+
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ feature.set(PIG_FEATURE.STREAMING.ordinal());
+ }
+
+ @Override
+ public void visitSplit(POSplit split) throws VisitorException {
+ feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+ }
+
+ @Override
+ public void visitDemux(PODemux demux) throws VisitorException {
+ feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+ }
+
+ @Override
+ public void visitPartialAgg(POPartialAgg partAgg){
+ feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
+ }
+ }
+
+ protected static class AliasVisitor extends PhyPlanVisitor {
+ private HashSet<String> aliasSet;
+ private List<String> alias;
+ private final List<String> aliasLocation;
+
+ public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.alias = alias;
+ this.aliasLocation = aliasLocation;
+ aliasSet = new HashSet<String>();
+ if (!alias.isEmpty()) {
+ for (String s : alias) aliasSet.add(s);
+ }
+ }
+
+ @Override
+ public void visitLoad(POLoad load) throws VisitorException {
+ setAlias(load);
+ super.visitLoad(load);
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ setAlias(join);
+ super.visitFRJoin(join);
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ setAlias(join);
+ super.visitMergeJoin(join);
+ }
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+ setAlias(mergeCoGrp);
+ super.visitMergeCoGroup(mergeCoGrp);
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg)
+ throws VisitorException {
+ setAlias(mg);
+ super.visitCollectedGroup(mg);
+ }
+
+ @Override
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
+ setAlias(distinct);
+ super.visitDistinct(distinct);
+ }
+
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ setAlias(stream);
+ super.visitStream(stream);
+ }
+
+ @Override
+ public void visitFilter(POFilter fl) throws VisitorException {
+ setAlias(fl);
+ super.visitFilter(fl);
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
+ setAlias(lr);
+ super.visitLocalRearrange(lr);
+ }
+
+ @Override
+ public void visitPOForEach(POForEach nfe) throws VisitorException {
+ setAlias(nfe);
+ super.visitPOForEach(nfe);
+ }
+
+ @Override
+ public void visitUnion(POUnion un) throws VisitorException {
+ setAlias(un);
+ super.visitUnion(un);
+ }
+
+ @Override
+ public void visitSort(POSort sort) throws VisitorException {
+ setAlias(sort);
+ super.visitSort(sort);
+ }
+
+ @Override
+ public void visitLimit(POLimit lim) throws VisitorException {
+ setAlias(lim);
+ super.visitLimit(lim);
+ }
+
+ @Override
+ public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+ setAlias(sk);
+ super.visitSkewedJoin(sk);
+ }
+
+ private void setAlias(PhysicalOperator op) {
+ String s = op.getAlias();
+ if (s != null) {
+ if (!aliasSet.contains(s)) {
+ alias.add(s);
+ aliasSet.add(s);
+ }
+ }
+ List<OriginalLocation> originalLocations = op.getOriginalLocations();
+ for (OriginalLocation originalLocation : originalLocations) {
+ aliasLocation.add(originalLocation.toString());
+ }
+ }
}
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java Thu Apr 24 16:19:26 2014
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -34,40 +33,17 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.OriginalLocation;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.Operator;
-import org.apache.pig.newplan.logical.relational.LOJoin;
-import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.OutputStats;
+import com.google.common.collect.Maps;
/**
* ScriptStates encapsulates settings for a Pig script that runs on a hadoop
@@ -77,106 +53,34 @@ import org.apache.pig.tools.pigstats.Out
* jobs can derive them from the job xmls.
*/
public class MRScriptState extends ScriptState {
-
private static final Log LOG = LogFactory.getLog(MRScriptState.class);
-
private Map<MapReduceOper, String> featureMap = null;
- private Map<MapReduceOper, String> aliasMap = new HashMap<MapReduceOper, String>();
- private Map<MapReduceOper, String> aliasLocationMap = new HashMap<MapReduceOper, String>();
-
+ private Map<MapReduceOper, String> aliasMap = Maps.newHashMap();
+ private Map<MapReduceOper, String> aliasLocationMap = Maps.newHashMap();
public MRScriptState(String id) {
super(id);
}
-
+
public static MRScriptState get() {
return (MRScriptState) ScriptState.get();
}
-
- public void registerListener(PigProgressNotificationListener listener) {
- listeners.add(listener);
- }
-
- public List<PigProgressNotificationListener> getAllListeners() {
- return listeners;
- }
-
- public void emitInitialPlanNotification(MROperPlan plan) {
- for (PigProgressNotificationListener listener: listeners) {
- try {
- listener.initialPlanNotification(id, plan);
- } catch (NoSuchMethodError e) {
- LOG.warn("PigProgressNotificationListener implementation doesn't "
- + "implement initialPlanNotification(..) method: "
- + listener.getClass().getName(), e);
- }
- }
- }
-
- public void emitLaunchStartedNotification(int numJobsToLaunch) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.launchStartedNotification(id, numJobsToLaunch);
- }
- }
-
- public void emitJobsSubmittedNotification(int numJobsSubmitted) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.jobsSubmittedNotification(id, numJobsSubmitted);
- }
- }
-
- public void emitJobStartedNotification(String assignedJobId) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.jobStartedNotification(id, assignedJobId);
- }
- }
-
- public void emitjobFinishedNotification(JobStats jobStats) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.jobFinishedNotification(id, jobStats);
- }
- }
-
- public void emitJobFailedNotification(JobStats jobStats) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.jobFailedNotification(id, jobStats);
- }
- }
-
- public void emitOutputCompletedNotification(OutputStats outputStats) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.outputCompletedNotification(id, outputStats);
- }
- }
-
- public void emitProgressUpdatedNotification(int progress) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.progressUpdatedNotification(id, progress);
- }
- }
-
- public void emitLaunchCompletedNotification(int numJobsSucceeded) {
- for (PigProgressNotificationListener listener: listeners) {
- listener.launchCompletedNotification(id, numJobsSucceeded);
- }
- }
-
public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
LOG.info("Pig script settings are added to the job");
conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
- conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
+ conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
-
+
try {
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
ArrayList<String> outputDirs = new ArrayList<String>();
- for (POStore st: stores) {
- outputDirs.add(st.getSFile().getFileName());
- }
+ for (POStore st: stores) {
+ outputDirs.add(st.getSFile().getFileName());
+ }
conf.set(PIG_PROPERTY.MAP_OUTPUT_DIRS.toString(), LoadFunc.join(outputDirs, ","));
} catch (VisitorException e) {
LOG.warn("unable to get the map stores", e);
@@ -185,29 +89,28 @@ public class MRScriptState extends Scrip
try {
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
ArrayList<String> outputDirs = new ArrayList<String>();
- for (POStore st: stores) {
- outputDirs.add(st.getSFile().getFileName());
- }
+ for (POStore st: stores) {
+ outputDirs.add(st.getSFile().getFileName());
+ }
conf.set(PIG_PROPERTY.REDUCE_OUTPUT_DIRS.toString(), LoadFunc.join(outputDirs, ","));
} catch (VisitorException e) {
LOG.warn("unable to get the reduce stores", e);
}
- }
+ }
try {
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
ArrayList<String> inputDirs = new ArrayList<String>();
if (lds != null && lds.size() > 0){
for (POLoad ld : lds) {
inputDirs.add(ld.getLFile().getFileName());
- }
- conf.set(PIG_PROPERTY.INPUT_DIRS.toString(), LoadFunc.join(inputDirs, ","));
+ }
+ conf.set(PIG_PROPERTY.INPUT_DIRS.toString(), LoadFunc.join(inputDirs, ","));
}
} catch (VisitorException e) {
LOG.warn("unable to get the map loads", e);
}
setPigFeature(mro, conf);
-
setJobParents(mro, conf);
conf.set("mapreduce.workflow.id", "pig_" + id);
@@ -290,7 +193,7 @@ public class MRScriptState extends Scrip
aliasLocationStr += " R: "+LoadFunc.join(aliasLocation, ",");
if (!alias.isEmpty()) {
Collections.sort(alias);
- }
+ }
} catch (VisitorException e) {
LOG.warn("unable to get alias", e);
}
@@ -305,7 +208,6 @@ public class MRScriptState extends Scrip
return aliasLocationMap.get(mro);
}
-
public String getPigFeature(MapReduceOper mro) {
if (featureMap == null) {
featureMap = new HashMap<MapReduceOper, String>();
@@ -348,8 +250,8 @@ public class MRScriptState extends Scrip
else{// if it is NATIVE MR , don't explore its plans
try {
new FeatureVisitor(mro.mapPlan, feature).visit();
- if (mro.reducePlan.isEmpty()) {
- feature.set(PIG_FEATURE.MAP_ONLY.ordinal());
+ if (mro.reducePlan.isEmpty()) {
+ feature.set(PIG_FEATURE.MAP_ONLY.ordinal());
} else {
new FeatureVisitor(mro.reducePlan, feature).visit();
}
@@ -359,7 +261,7 @@ public class MRScriptState extends Scrip
}
StringBuilder sb = new StringBuilder();
for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
- if (sb.length() > 0) sb.append(",");
+ if (sb.length() > 0) sb.append(",");
sb.append(PIG_FEATURE.values()[i].name());
}
retStr = sb.toString();
@@ -368,186 +270,4 @@ public class MRScriptState extends Scrip
return retStr;
}
-
- private static class FeatureVisitor extends PhyPlanVisitor {
- private BitSet feature;
-
- public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.feature = feature;
- }
-
- @Override
- public void visitFRJoin(POFRJoin join) throws VisitorException {
- feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
- }
-
- @Override
- public void visitMergeJoin(POMergeJoin join) throws VisitorException {
- if (join.getJoinType()==LOJoin.JOINTYPE.MERGESPARSE)
- feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
- else
- feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
- }
-
- @Override
- public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
- throws VisitorException {
- feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
- }
-
- @Override
- public void visitCollectedGroup(POCollectedGroup mg)
- throws VisitorException {
- feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
- }
-
- @Override
- public void visitDistinct(PODistinct distinct) throws VisitorException {
- feature.set(PIG_FEATURE.DISTINCT.ordinal());
- }
-
- @Override
- public void visitStream(POStream stream) throws VisitorException {
- feature.set(PIG_FEATURE.STREAMING.ordinal());
- }
-
- @Override
- public void visitSplit(POSplit split) throws VisitorException {
- feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
- }
-
- @Override
- public void visitDemux(PODemux demux) throws VisitorException {
- feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
- }
-
- @Override
- public void visitPartialAgg(POPartialAgg partAgg){
- feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
- }
-
- }
-
- private static class AliasVisitor extends PhyPlanVisitor {
-
- private HashSet<String> aliasSet;
-
- private List<String> alias;
-
- private final List<String> aliasLocation;
-
- public AliasVisitor(PhysicalPlan plan, List<String> alias, List<String> aliasLocation) {
- super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
- plan));
- this.alias = alias;
- this.aliasLocation = aliasLocation;
- aliasSet = new HashSet<String>();
- if (!alias.isEmpty()) {
- for (String s : alias) aliasSet.add(s);
- }
- }
-
- @Override
- public void visitLoad(POLoad load) throws VisitorException {
- setAlias(load);
- super.visitLoad(load);
- }
-
- @Override
- public void visitFRJoin(POFRJoin join) throws VisitorException {
- setAlias(join);
- super.visitFRJoin(join);
- }
-
- @Override
- public void visitMergeJoin(POMergeJoin join) throws VisitorException {
- setAlias(join);
- super.visitMergeJoin(join);
- }
-
- @Override
- public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
- throws VisitorException {
- setAlias(mergeCoGrp);
- super.visitMergeCoGroup(mergeCoGrp);
- }
-
- @Override
- public void visitCollectedGroup(POCollectedGroup mg)
- throws VisitorException {
- setAlias(mg);
- super.visitCollectedGroup(mg);
- }
-
- @Override
- public void visitDistinct(PODistinct distinct) throws VisitorException {
- setAlias(distinct);
- super.visitDistinct(distinct);
- }
-
- @Override
- public void visitStream(POStream stream) throws VisitorException {
- setAlias(stream);
- super.visitStream(stream);
- }
-
- @Override
- public void visitFilter(POFilter fl) throws VisitorException {
- setAlias(fl);
- super.visitFilter(fl);
- }
-
- @Override
- public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
- setAlias(lr);
- super.visitLocalRearrange(lr);
- }
-
- @Override
- public void visitPOForEach(POForEach nfe) throws VisitorException {
- setAlias(nfe);
- super.visitPOForEach(nfe);
- }
-
- @Override
- public void visitUnion(POUnion un) throws VisitorException {
- setAlias(un);
- super.visitUnion(un);
- }
-
- @Override
- public void visitSort(POSort sort) throws VisitorException {
- setAlias(sort);
- super.visitSort(sort);
- }
-
- @Override
- public void visitLimit(POLimit lim) throws VisitorException {
- setAlias(lim);
- super.visitLimit(lim);
- }
-
- @Override
- public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
- setAlias(sk);
- super.visitSkewedJoin(sk);
- }
-
- private void setAlias(PhysicalOperator op) {
- String s = op.getAlias();
- if (s != null) {
- if (!aliasSet.contains(s)) {
- alias.add(s);
- aliasSet.add(s);
- }
- }
- List<OriginalLocation> originalLocations = op.getOriginalLocations();
- for (OriginalLocation originalLocation : originalLocations) {
- aliasLocation.add(originalLocation.toString());
- }
- }
- }
-
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1589784&r1=1589783&r2=1589784&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Thu Apr 24 16:19:26 2014
@@ -32,8 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
@@ -41,9 +39,9 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigRunner;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
@@ -618,7 +616,7 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
PigContext ctx = stats.getPigContext();
- Assert.assertNotNull(ctx);
+ assertNotNull(ctx);
assertTrue(ctx.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
assertTrue("default", ctx.getProperties().getProperty("mapred.job.queue.name")!=null && ctx.getProperties().getProperty("mapred.job.queue.name").equals("default")||
@@ -675,8 +673,8 @@ public class TestPigRunner {
String[] args = { "-x", "local", "-c", PIG_FILE };
PigStats stats = PigRunner.run(args, null);
- Assert.assertTrue(stats.isSuccessful());
- Assert.assertEquals( 0, stats.getReturnCode() );
+ assertTrue(stats.isSuccessful());
+ assertEquals( 0, stats.getReturnCode() );
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -965,7 +963,7 @@ public class TestPigRunner {
private static final int JobFinished = 3;
@Override
- public void initialPlanNotification(String id, MROperPlan plan) {
+ public void initialPlanNotification(String id, OperatorPlan<?> plan) {
System.out.println("id: " + id + " planNodes: " + plan.getKeys().size());
assertNotNull(plan);
}