You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/16 18:02:27 UTC
svn commit: r1595251 - in /pig/branches/tez:
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/newplan/ src/org/apache/pig/tools/pigstats/tez/
test/org/apache/pig/test/
Author: cheolsoo
Date: Fri May 16 16:02:27 2014
New Revision: 1595251
URL: http://svn.apache.org/r1595251
Log:
PIG-3918: Implement PPNL for Tez mode (cheolsoo)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java
pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Fri May 16 16:02:27 2014
@@ -29,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
@@ -45,8 +46,8 @@ public class POSimpleTezLoad extends POL
private KeyValueReader reader;
private transient Configuration conf;
- public POSimpleTezLoad(OperatorKey k) {
- super(k);
+ public POSimpleTezLoad(OperatorKey k, FileSpec lfile) {
+ super(k, lfile);
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri May 16 16:02:27 2014
@@ -690,8 +690,9 @@ public class TezDagBuilder extends TezOp
// Now add the input handling operator for the Tez backend
// TODO: Move this upstream to the PhysicalPlan generation
POSimpleTezLoad tezLoad = new POSimpleTezLoad(new OperatorKey(
- scope, nig.getNextNodeId(scope)));
+ scope, nig.getNextNodeId(scope)), ld.getLFile());
tezLoad.setInputKey(ld.getOperatorKey().toString());
+ tezLoad.setAlias(ld.getAlias());
tezOp.plan.add(tezLoad);
for (PhysicalOperator sucs : ldSucs) {
tezOp.plan.connect(tezLoad, sucs);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri May 16 16:02:27 2014
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
import org.apache.tez.client.TezSession;
@@ -38,6 +39,7 @@ import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -74,15 +76,19 @@ public class TezJob extends ControlledJo
this.vertexCounters = Maps.newHashMap();
}
- public DAG getDag() {
+ public DAG getDAG() {
return dag;
}
- public DAGStatus getDagStatus() {
+ public ApplicationId getApplicationId() {
+ return dagClient == null ? null : dagClient.getApplicationId();
+ }
+
+ public DAGStatus getDAGStatus() {
return dagStatus;
}
- public TezCounters getDagCounters() {
+ public TezCounters getDAGCounters() {
return dagCounters;
}
@@ -94,6 +100,21 @@ public class TezJob extends ControlledJo
return vertexCounters.get(group).get(name);
}
+ public Double getDAGProgress() {
+ Progress p = dagStatus.getDAGProgress();
+ return (double)p.getSucceededTaskCount()/(double)p.getTotalTaskCount();
+ }
+
+ public Map<String, Double> getVertexProgress() {
+ Map<String, Double> vertexProgress = Maps.newHashMap();
+ for (Map.Entry<String, Progress> entry : dagStatus.getVertexProgress().entrySet()) {
+ Progress p = entry.getValue();
+ Double progress = (double)p.getSucceededTaskCount()/(double)p.getTotalTaskCount();
+ vertexProgress.put(entry.getKey(), progress);
+ }
+ return vertexProgress;
+ }
+
@Override
public void submit() {
try {
@@ -232,4 +253,3 @@ public class TezJob extends ControlledJo
return "";
}
}
-
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Fri May 16 16:02:27 2014
@@ -20,13 +20,11 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop23.PigJobControl;
-import org.apache.pig.tools.pigstats.tez.TezStats;
public class TezJobControl extends PigJobControl {
private static final Log LOG = LogFactory.getLog(TezJobControl.class);
private TezJobNotifier notifier = null;
- private TezStats stats = null;
public TezJobControl(String groupName, int timeToSleep) {
super(groupName, timeToSleep);
@@ -36,10 +34,6 @@ public class TezJobControl extends PigJo
this.notifier = notifier;
}
- public void setTezStats(TezStats stats) {
- this.stats = stats;
- }
-
@Override
public void run() {
try {
@@ -57,9 +51,6 @@ public class TezJobControl extends PigJo
throw e;
} finally {
stop();
- if (stats!=null) {
- stats.accumulateStats(this);
- }
if (notifier != null) {
notifier.complete(this);
notifier = null;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri May 16 16:02:27 2014
@@ -42,6 +42,10 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.TezConfiguration;
/**
@@ -51,6 +55,8 @@ public class TezLauncher extends Launche
private static final Log log = LogFactory.getLog(TezLauncher.class);
private boolean aggregateWarning = false;
+ private TezScriptState tezScriptState;
+ private TezStats tezStats;
@Override
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
@@ -66,7 +72,8 @@ public class TezLauncher extends Launche
List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
- TezStats tezStats = new TezStats(pc);
+ tezScriptState = TezScriptState.get();
+ tezStats = new TezStats(pc);
PigStats.start(tezStats);
TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
@@ -85,15 +92,13 @@ public class TezLauncher extends Launche
jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
((TezJobControl)jc).setJobNotifier(notifier);
- ((TezJobControl)jc).setTezStats(tezStats);
// Initially, all jobs are in wait state.
List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
- // TODO: MapReduceLauncher does a couple of things here. For example,
- // notify PPNL of job submission, update PigStas, etc. We will worry
- // about them later.
+ tezScriptState.emitInitialPlanNotification(tezPlan);
+ tezScriptState.emitLaunchStartedNotification(tezPlan.size());
// Set the thread UDFContext so registered classes are available.
final UDFContext udfContext = UDFContext.getUDFContext();
@@ -109,27 +114,92 @@ public class TezLauncher extends Launche
jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
jcThread.setContextClassLoader(PigContext.getClassLoader());
+ // TezJobControl always holds a single TezJob. We use JobControl
+ // only because it is convenient to launch the job via
+ // ControlledJob.submit().
+ TezJob job = (TezJob)jobsWithoutIds.get(0);
+ tezStats.setTezJob(job);
+
// Mark the times that the jobs were submitted so it's reflected in job
// history props
long scriptSubmittedTimestamp = System.currentTimeMillis();
- for (ControlledJob job : jobsWithoutIds) {
- // Job.getConfiguration returns the shared configuration object
- Configuration jobConf = job.getJob().getConfiguration();
- jobConf.set("pig.script.submitted.timestamp",
- Long.toString(scriptSubmittedTimestamp));
- jobConf.set("pig.job.submitted.timestamp",
- Long.toString(System.currentTimeMillis()));
- }
+ // Job.getConfiguration returns the shared configuration object
+ Configuration jobConf = job.getJob().getConfiguration();
+ jobConf.set("pig.script.submitted.timestamp",
+ Long.toString(scriptSubmittedTimestamp));
+ jobConf.set("pig.job.submitted.timestamp",
+ Long.toString(System.currentTimeMillis()));
+
+ // Inform ppnl of jobs submission
+ tezScriptState.emitJobsSubmittedNotification(jobsWithoutIds.size());
// All the setup done, now lets launch the jobs. DAG is submitted to
// YARN cluster by TezJob.submit().
jcThread.start();
+
+ Double prevProgress = 0.0;
+ while(!jc.allFinished()) {
+ List<ControlledJob> jobsAssignedIdInThisRun = new ArrayList<ControlledJob>();
+ if (job.getApplicationId() != null) {
+ jobsAssignedIdInThisRun.add(job);
+ }
+ notifyStarted(job);
+ jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+ prevProgress = notifyProgress(job, prevProgress);
+ }
+
+ notifyFinishedOrFailed(job);
+ tezStats.accumulateStats(job);
+ tezScriptState.emitProgressUpdatedNotification(100);
}
tezStats.finish();
+ tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());
+
return tezStats;
}
+ private void notifyStarted(TezJob job) throws IOException {
+ for (Vertex v : job.getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ byte[] bb = v.getProcessorDescriptor().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(bb);
+ tts.setConf(conf);
+ tts.setId(v.getVertexName());
+ tezScriptState.emitJobStartedNotification(v.getVertexName());
+ }
+ }
+
+ private Double notifyProgress(TezJob job, Double prevProgress) {
+ int numberOfJobs = tezStats.getNumberJobs();
+ Double perCom = 0.0;
+ if (job.getJobState() == ControlledJob.State.RUNNING) {
+ Double fractionComplete = job.getDAGProgress();
+ perCom += fractionComplete;
+ }
+ perCom = (perCom/(double)numberOfJobs)*100;
+ if (perCom >= (prevProgress + 4.0)) {
+ tezScriptState.emitProgressUpdatedNotification( perCom.intValue() );
+ return perCom;
+ } else {
+ return prevProgress;
+ }
+ }
+
+ private void notifyFinishedOrFailed(TezJob job) {
+ if (job.getJobState() == ControlledJob.State.SUCCESS) {
+ for (Vertex v : job.getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ tezScriptState.emitjobFinishedNotification(tts);
+ }
+ } else if (job.getJobState() == ControlledJob.State.FAILED) {
+ for (Vertex v : ((TezJob)job).getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ tezScriptState.emitJobFailedNotification(tts);
+ }
+ }
+ }
+
@Override
public void explain(PhysicalPlan php, PigContext pc, PrintStream ps,
String format, boolean verbose) throws PlanException,
Modified: pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java Fri May 16 16:02:27 2014
@@ -190,6 +190,9 @@ public abstract class BaseOperatorPlan i
int fromPos,
Operator to,
int toPos) {
+ if( isConnected( from, to ) || from == null || to == null) {
+ return;
+ }
markDirty();
fromEdges.put(from, to, fromPos);
toEdges.put(to, from, toPos);
@@ -211,9 +214,9 @@ public abstract class BaseOperatorPlan i
* @param to Operator edge will go to
*/
public void connect(Operator from, Operator to) {
- if( isConnected( from, to ) )
- return;
-
+ if( isConnected( from, to ) || from == null || to == null) {
+ return;
+ }
markDirty();
fromEdges.put(from, to);
toEdges.put(to, from);
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Fri May 16 16:02:27 2014
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
@@ -63,6 +62,7 @@ public class TezStats extends PigStats {
public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
+ private TezJob tezJob;
private List<String> dagStatsStrings;
private Map<String, TezTaskStats> tezOpVertexMap;
private List<TezTaskStats> taskStatsToBeRemoved;
@@ -178,24 +178,27 @@ public class TezStats extends PigStats {
LOG.info("Script Statistics:\n" + sb.toString());
}
+ public TezTaskStats getVertexStats(String vertexName) {
+ return tezOpVertexMap.get(vertexName);
+ }
+
/**
* Updates the statistics after DAG is finished.
*
* @param jc the job control
*/
- public void accumulateStats(JobControl jc) throws IOException {
- for (ControlledJob job : jc.getSuccessfulJobList()) {
- addVertexStats((TezJob)job, true);
- dagStatsStrings.add(getDisplayString((TezJob)job));
- }
- for (ControlledJob job : jc.getFailedJobList()) {
- addVertexStats((TezJob)job, false);
- dagStatsStrings.add(getDisplayString((TezJob)job));
+ public void accumulateStats(TezJob tezJob) throws IOException {
+ if (tezJob.getJobState() == ControlledJob.State.SUCCESS) {
+ addVertexStats(tezJob, true);
+ dagStatsStrings.add(getDisplayString(tezJob));
+ } else if (tezJob.getJobState() == ControlledJob.State.FAILED) {
+ addVertexStats(tezJob, false);
+ dagStatsStrings.add(getDisplayString(tezJob));
}
}
private void addVertexStats(TezJob tezJob, boolean succeeded) throws IOException {
- DAG dag = tezJob.getDag();
+ DAG dag = tezJob.getDAG();
for (String name : tezOpVertexMap.keySet()) {
Vertex v = dag.getVertex(name);
if (v != null) {
@@ -227,7 +230,7 @@ public class TezStats extends PigStats {
private static String getDisplayString(TezJob tezJob) {
StringBuilder sb = new StringBuilder();
- TezCounters cnt = tezJob.getDagCounters();
+ TezCounters cnt = tezJob.getDAGCounters();
if (cnt == null) {
return "";
}
@@ -258,6 +261,14 @@ public class TezStats extends PigStats {
return sb.toString();
}
+ public void setTezJob(TezJob tezJob) {
+ this.tezJob = tezJob;
+ }
+
+ public TezJob getTezJob() {
+ return tezJob;
+ }
+
@Override
public boolean isEmbedded() {
return false;
Modified: pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java Fri May 16 16:02:27 2014
@@ -74,6 +74,7 @@ public class TestAccumulator {
@Before
public void setUp() throws Exception {
+ Util.resetStateForExecModeSwitch();
// Drop stale configuration from previous test run
properties.remove(PigConfiguration.OPT_ACCUMULATOR);
pigServer = new PigServer(cluster.getExecType(), properties);
Modified: pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java Fri May 16 16:02:27 2014
@@ -121,7 +121,6 @@ import org.apache.pig.data.DefaultBagFac
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -202,10 +201,7 @@ public class TestBuiltin {
@Before
public void setUp() throws Exception {
- // re initialize FileLocalizer so that each test will run correctly
- // without any side effect of other tests - this is needed since some
- // tests are in mapred and some in local mode.
- FileLocalizer.setInitialized(false);
+ Util.resetStateForExecModeSwitch();
pigServer = new PigServer(ExecType.LOCAL, new Properties());
pigServer.setValidateEachStatement(true);
@@ -2634,6 +2630,7 @@ public class TestBuiltin {
@Test
public void testSFPig() throws Exception {
+ Util.resetStateForExecModeSwitch();
PigServer mrPigServer = new PigServer(cluster.getExecType(), properties);
String inputStr = "amy\tbob\tcharlene\tdavid\terin\tfrank";
Util.createInputFile(cluster, "testSFPig-input.txt", new String[]
Modified: pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java Fri May 16 16:02:27 2014
@@ -209,6 +209,7 @@ public class TestLocalRearrange {
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ Util.resetStateForExecModeSwitch();
PigServer myPig = new PigServer(cluster.getExecType(), cluster.getProperties());
myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);");
Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Fri May 16 16:02:27 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
-import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -47,7 +46,6 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
import org.junit.Before;
@@ -72,6 +70,7 @@ public class TestSkewedJoin {
@Before
public void setUp() throws Exception {
+ Util.resetStateForExecModeSwitch();
pigServer = new PigServer(cluster.getExecType(), properties);
}
@@ -213,8 +212,6 @@ public class TestSkewedJoin {
@Test
public void testSkewedJoinWithNoProperties() throws IOException{
- pigServer = new PigServer(cluster.getExecType(), properties);
-
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();