You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/16 18:02:27 UTC

svn commit: r1595251 - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/newplan/ src/org/apache/pig/tools/pigstats/tez/ test/org/apache/pig/test/

Author: cheolsoo
Date: Fri May 16 16:02:27 2014
New Revision: 1595251

URL: http://svn.apache.org/r1595251
Log:
PIG-3918: Implement PPNL for Tez mode (cheolsoo)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
    pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
    pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java
    pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java Fri May 16 16:02:27 2014
@@ -29,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -45,8 +46,8 @@ public class POSimpleTezLoad extends POL
     private KeyValueReader reader;
     private transient Configuration conf;
 
-    public POSimpleTezLoad(OperatorKey k) {
-        super(k);
+    public POSimpleTezLoad(OperatorKey k, FileSpec lfile) {
+        super(k, lfile);
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri May 16 16:02:27 2014
@@ -690,8 +690,9 @@ public class TezDagBuilder extends TezOp
                 // Now add the input handling operator for the Tez backend
                 // TODO: Move this upstream to the PhysicalPlan generation
                 POSimpleTezLoad tezLoad = new POSimpleTezLoad(new OperatorKey(
-                        scope, nig.getNextNodeId(scope)));
+                        scope, nig.getNextNodeId(scope)), ld.getLFile());
                 tezLoad.setInputKey(ld.getOperatorKey().toString());
+                tezLoad.setAlias(ld.getAlias());
                 tezOp.plan.add(tezLoad);
                 for (PhysicalOperator sucs : ldSucs) {
                     tezOp.plan.connect(tezLoad, sucs);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri May 16 16:02:27 2014
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
 import org.apache.tez.client.TezSession;
@@ -38,6 +39,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -74,15 +76,19 @@ public class TezJob extends ControlledJo
         this.vertexCounters = Maps.newHashMap();
     }
 
-    public DAG getDag() {
+    public DAG getDAG() {
         return dag;
     }
 
-    public DAGStatus getDagStatus() {
+    public ApplicationId getApplicationId() {
+        return dagClient == null ? null : dagClient.getApplicationId();
+    }
+
+    public DAGStatus getDAGStatus() {
         return dagStatus;
     }
 
-    public TezCounters getDagCounters() {
+    public TezCounters getDAGCounters() {
         return dagCounters;
     }
 
@@ -94,6 +100,21 @@ public class TezJob extends ControlledJo
         return vertexCounters.get(group).get(name);
     }
 
+    public Double getDAGProgress() {
+        Progress p = dagStatus.getDAGProgress();
+        return (double)p.getSucceededTaskCount()/(double)p.getTotalTaskCount();
+    }
+
+    public Map<String, Double> getVertexProgress() {
+        Map<String, Double> vertexProgress = Maps.newHashMap();
+        for (Map.Entry<String, Progress> entry : dagStatus.getVertexProgress().entrySet()) {
+            Progress p = entry.getValue();
+            Double progress = (double)p.getSucceededTaskCount()/(double)p.getTotalTaskCount();
+            vertexProgress.put(entry.getKey(), progress);
+        }
+        return vertexProgress;
+    }
+
     @Override
     public void submit() {
         try {
@@ -232,4 +253,3 @@ public class TezJob extends ControlledJo
         return "";
     }
 }
-

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Fri May 16 16:02:27 2014
@@ -20,13 +20,11 @@ package org.apache.pig.backend.hadoop.ex
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.hadoop23.PigJobControl;
-import org.apache.pig.tools.pigstats.tez.TezStats;
 
 public class TezJobControl extends PigJobControl {
 
     private static final Log LOG = LogFactory.getLog(TezJobControl.class);
     private TezJobNotifier notifier = null;
-    private TezStats stats = null;
 
     public TezJobControl(String groupName, int timeToSleep) {
         super(groupName, timeToSleep);
@@ -36,10 +34,6 @@ public class TezJobControl extends PigJo
         this.notifier = notifier;
     }
 
-    public void setTezStats(TezStats stats) {
-        this.stats = stats;
-    }
-
     @Override
     public void run() {
         try {
@@ -57,9 +51,6 @@ public class TezJobControl extends PigJo
                 throw e;
             } finally {
                 stop();
-                if (stats!=null) {
-                    stats.accumulateStats(this);
-                }
                 if (notifier != null) {
                     notifier.complete(this);
                     notifier = null;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri May 16 16:02:27 2014
@@ -42,6 +42,10 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.TezConfiguration;
 
 /**
@@ -51,6 +55,8 @@ public class TezLauncher extends Launche
 
     private static final Log log = LogFactory.getLog(TezLauncher.class);
     private boolean aggregateWarning = false;
+    private TezScriptState tezScriptState;
+    private TezStats tezStats;
 
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
@@ -66,7 +72,8 @@ public class TezLauncher extends Launche
 
         List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
 
-        TezStats tezStats = new TezStats(pc);
+        tezScriptState = TezScriptState.get();
+        tezStats = new TezStats(pc);
         PigStats.start(tezStats);
 
         TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
@@ -85,15 +92,13 @@ public class TezLauncher extends Launche
             jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
             TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
             ((TezJobControl)jc).setJobNotifier(notifier);
-            ((TezJobControl)jc).setTezStats(tezStats);
 
             // Initially, all jobs are in wait state.
             List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
             log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
 
-            // TODO: MapReduceLauncher does a couple of things here. For example,
-            // notify PPNL of job submission, update PigStas, etc. We will worry
-            // about them later.
+            tezScriptState.emitInitialPlanNotification(tezPlan);
+            tezScriptState.emitLaunchStartedNotification(tezPlan.size());
 
             // Set the thread UDFContext so registered classes are available.
             final UDFContext udfContext = UDFContext.getUDFContext();
@@ -109,27 +114,92 @@ public class TezLauncher extends Launche
             jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
             jcThread.setContextClassLoader(PigContext.getClassLoader());
 
+            // TezJobControl always holds a single TezJob. We use JobControl
+            // only because it is convenient to launch the job via
+            // ControlledJob.submit().
+            TezJob job = (TezJob)jobsWithoutIds.get(0);
+            tezStats.setTezJob(job);
+
             // Mark the times that the jobs were submitted so it's reflected in job
             // history props
             long scriptSubmittedTimestamp = System.currentTimeMillis();
-            for (ControlledJob job : jobsWithoutIds) {
-                // Job.getConfiguration returns the shared configuration object
-                Configuration jobConf = job.getJob().getConfiguration();
-                jobConf.set("pig.script.submitted.timestamp",
-                        Long.toString(scriptSubmittedTimestamp));
-                jobConf.set("pig.job.submitted.timestamp",
-                        Long.toString(System.currentTimeMillis()));
-            }
+            // Job.getConfiguration returns the shared configuration object
+            Configuration jobConf = job.getJob().getConfiguration();
+            jobConf.set("pig.script.submitted.timestamp",
+                    Long.toString(scriptSubmittedTimestamp));
+            jobConf.set("pig.job.submitted.timestamp",
+                    Long.toString(System.currentTimeMillis()));
+
+            // Inform ppnl of jobs submission
+            tezScriptState.emitJobsSubmittedNotification(jobsWithoutIds.size());
 
             // All the setup done, now lets launch the jobs. DAG is submitted to
             // YARN cluster by TezJob.submit().
             jcThread.start();
+
+            Double prevProgress = 0.0;
+            while(!jc.allFinished()) {
+                List<ControlledJob> jobsAssignedIdInThisRun = new ArrayList<ControlledJob>();
+                if (job.getApplicationId() != null) {
+                    jobsAssignedIdInThisRun.add(job);
+                }
+                notifyStarted(job);
+                jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+                prevProgress = notifyProgress(job, prevProgress);
+            }
+
+            notifyFinishedOrFailed(job);
+            tezStats.accumulateStats(job);
+            tezScriptState.emitProgressUpdatedNotification(100);
         }
 
         tezStats.finish();
+        tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());
+
         return tezStats;
     }
 
+    private void notifyStarted(TezJob job) throws IOException {
+        for (Vertex v : job.getDAG().getVertices()) {
+            TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+            byte[] bb = v.getProcessorDescriptor().getUserPayload();
+            Configuration conf = TezUtils.createConfFromUserPayload(bb);
+            tts.setConf(conf);
+            tts.setId(v.getVertexName());
+            tezScriptState.emitJobStartedNotification(v.getVertexName());
+        }
+    }
+
+    private Double notifyProgress(TezJob job, Double prevProgress) {
+        int numberOfJobs = tezStats.getNumberJobs();
+        Double perCom = 0.0;
+        if (job.getJobState() == ControlledJob.State.RUNNING) {
+            Double fractionComplete = job.getDAGProgress();
+            perCom += fractionComplete;
+        }
+        perCom = (perCom/(double)numberOfJobs)*100;
+        if (perCom >= (prevProgress + 4.0)) {
+            tezScriptState.emitProgressUpdatedNotification( perCom.intValue() );
+            return perCom;
+        } else {
+            return prevProgress;
+        }
+    }
+
+    private void notifyFinishedOrFailed(TezJob job) {
+        if (job.getJobState() == ControlledJob.State.SUCCESS) {
+            for (Vertex v : job.getDAG().getVertices()) {
+                TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+                tezScriptState.emitjobFinishedNotification(tts);
+            }
+        } else if (job.getJobState() == ControlledJob.State.FAILED) {
+            for (Vertex v : ((TezJob)job).getDAG().getVertices()) {
+                TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+                tezScriptState.emitJobFailedNotification(tts);
+            }
+        }
+    }
+
     @Override
     public void explain(PhysicalPlan php, PigContext pc, PrintStream ps,
             String format, boolean verbose) throws PlanException,

Modified: pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/BaseOperatorPlan.java Fri May 16 16:02:27 2014
@@ -190,6 +190,9 @@ public abstract class BaseOperatorPlan i
                         int fromPos,
                         Operator to,
                         int toPos) {
+        if( isConnected( from, to ) || from == null || to == null) {
+            return;
+        }
         markDirty();
         fromEdges.put(from, to, fromPos);
         toEdges.put(to, from, toPos);
@@ -211,9 +214,9 @@ public abstract class BaseOperatorPlan i
      * @param to Operator edge will go to
      */
     public void connect(Operator from, Operator to) {
-    	if( isConnected( from, to ) )
-    		return;
-    	
+        if( isConnected( from, to ) || from == null || to == null) {
+            return;
+        }
         markDirty();
         fromEdges.put(from, to);
         toEdges.put(to, from);

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/tez/TezStats.java Fri May 16 16:02:27 2014
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob;
@@ -63,6 +62,7 @@ public class TezStats extends PigStats {
     public static final String FS_COUNTER_GROUP = FileSystemCounter.class.getName();
     public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
 
+    private TezJob tezJob;
     private List<String> dagStatsStrings;
     private Map<String, TezTaskStats> tezOpVertexMap;
     private List<TezTaskStats> taskStatsToBeRemoved;
@@ -178,24 +178,27 @@ public class TezStats extends PigStats {
         LOG.info("Script Statistics:\n" + sb.toString());
     }
 
+    public TezTaskStats getVertexStats(String vertexName) {
+        return tezOpVertexMap.get(vertexName);
+    }
+
     /**
      * Updates the statistics after DAG is finished.
      *
      * @param jc the job control
      */
-    public void accumulateStats(JobControl jc) throws IOException {
-        for (ControlledJob job : jc.getSuccessfulJobList()) {
-            addVertexStats((TezJob)job, true);
-            dagStatsStrings.add(getDisplayString((TezJob)job));
-        }
-        for (ControlledJob job : jc.getFailedJobList()) {
-            addVertexStats((TezJob)job, false);
-            dagStatsStrings.add(getDisplayString((TezJob)job));
+    public void accumulateStats(TezJob tezJob) throws IOException {
+        if (tezJob.getJobState() == ControlledJob.State.SUCCESS) {
+            addVertexStats(tezJob, true);
+            dagStatsStrings.add(getDisplayString(tezJob));
+        } else if (tezJob.getJobState() == ControlledJob.State.FAILED) {
+            addVertexStats(tezJob, false);
+            dagStatsStrings.add(getDisplayString(tezJob));
         }
     }
 
     private void addVertexStats(TezJob tezJob, boolean succeeded) throws IOException {
-        DAG dag = tezJob.getDag();
+        DAG dag = tezJob.getDAG();
         for (String name : tezOpVertexMap.keySet()) {
             Vertex v = dag.getVertex(name);
             if (v != null) {
@@ -227,7 +230,7 @@ public class TezStats extends PigStats {
 
     private static String getDisplayString(TezJob tezJob) {
         StringBuilder sb = new StringBuilder();
-        TezCounters cnt = tezJob.getDagCounters();
+        TezCounters cnt = tezJob.getDAGCounters();
         if (cnt == null) {
             return "";
         }
@@ -258,6 +261,14 @@ public class TezStats extends PigStats {
         return sb.toString();
     }
 
+    public void setTezJob(TezJob tezJob) {
+        this.tezJob = tezJob;
+    }
+
+    public TezJob getTezJob() {
+        return tezJob;
+    }
+
     @Override
     public boolean isEmbedded() {
         return false;

Modified: pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java Fri May 16 16:02:27 2014
@@ -74,6 +74,7 @@ public class TestAccumulator {
 
     @Before
     public void setUp() throws Exception {
+        Util.resetStateForExecModeSwitch();
         // Drop stale configuration from previous test run
         properties.remove(PigConfiguration.OPT_ACCUMULATOR);
         pigServer = new PigServer(cluster.getExecType(), properties);

Modified: pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestBuiltin.java Fri May 16 16:02:27 2014
@@ -121,7 +121,6 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -202,10 +201,7 @@ public class TestBuiltin {
 
     @Before
     public void setUp() throws Exception {
-        // re initialize FileLocalizer so that each test will run correctly
-        // without any side effect of other tests - this is needed since some
-        // tests are in mapred and some in local mode.
-        FileLocalizer.setInitialized(false);
+        Util.resetStateForExecModeSwitch();
 
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
         pigServer.setValidateEachStatement(true);
@@ -2634,6 +2630,7 @@ public class TestBuiltin {
 
     @Test
     public void testSFPig() throws Exception {
+        Util.resetStateForExecModeSwitch();
         PigServer mrPigServer = new PigServer(cluster.getExecType(), properties);
         String inputStr = "amy\tbob\tcharlene\tdavid\terin\tfrank";
         Util.createInputFile(cluster, "testSFPig-input.txt", new String[]

Modified: pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestLocalRearrange.java Fri May 16 16:02:27 2014
@@ -209,6 +209,7 @@ public class TestLocalRearrange  {
             w.close();
             Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
 
+            Util.resetStateForExecModeSwitch();
             PigServer myPig = new PigServer(cluster.getExecType(), cluster.getProperties());
 
             myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);");

Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1595251&r1=1595250&r2=1595251&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Fri May 16 16:02:27 2014
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -47,7 +46,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -72,6 +70,7 @@ public class TestSkewedJoin {
 
     @Before
     public void setUp() throws Exception {
+        Util.resetStateForExecModeSwitch();
         pigServer = new PigServer(cluster.getExecType(), properties);
     }
 
@@ -213,8 +212,6 @@ public class TestSkewedJoin {
 
     @Test
     public void testSkewedJoinWithNoProperties() throws IOException{
-        pigServer = new PigServer(cluster.getExecType(), properties);
-
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag();