You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/18 16:24:53 UTC

svn commit: r1543058 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/scripting/ src/org/apache/pig/tools/pigstats/ test/org/apache/...

Author: cheolsoo
Date: Mon Nov 18 15:24:52 2013
New Revision: 1543058

URL: http://svn.apache.org/r1543058
Log:
PIG-3525: PigStats.get() and ScriptState.get() shouldn't return MR-specific objects (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecutionEngine.java
    pig/trunk/src/org/apache/pig/scripting/BoundScript.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
    pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java
    pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov 18 15:24:52 2013
@@ -52,6 +52,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3525: PigStats.get() and ScriptState.get() shouldn't return MR-specific objects (cheolsoo)
+
 PIG-3568: Define the semantics of POStatus.STATUS_NULL (mwagner via cheolsoo)
 
 PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Mon Nov 18 15:24:52 2013
@@ -352,16 +352,18 @@ static int run(String args[], PigProgres
                      }
             }
         }
-        
+
         // create the context with the parameter
         PigContext pigContext = new PigContext(properties);
 
         // create the static script state object
+        ScriptState scriptState = pigContext.getExecutionEngine().instantiateScriptState();
         String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
-        ScriptState scriptState = ScriptState.start(commandLine, pigContext);
+        scriptState.setCommandLine(commandLine);
         if (listener != null) {
             scriptState.registerListener(listener);
         }
+        ScriptState.start(scriptState);
 
         pigContext.getProperties().setProperty("pig.cmd.args", commandLine);
 

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Mon Nov 18 15:24:52 2013
@@ -229,6 +229,14 @@ public class PigServer {
         }
 
         addJarsFromProperties();
+
+        if (PigStats.get() == null) {
+            PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+        }
+
+        if (ScriptState.get() == null) {
+            ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
+        }
     }
 
     private void addJarsFromProperties() throws ExecException {
@@ -1370,7 +1378,7 @@ public class PigServer {
      * @return LogicalPlanData
      */
     public LogicalPlanData getLogicalPlanData() {
-	return new LogicalPlanData(currDAG.lp);
+        return new LogicalPlanData(currDAG.lp);
     }
 
     /*

Modified: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Mon Nov 18 15:24:52 2013
@@ -167,6 +167,15 @@ public interface ExecutionEngine {
     public ScriptState instantiateScriptState();
 
     /**
+     * Creates a PigStats object which will be accessible as a ThreadLocal
+     * variable inside the PigStats class. This method is called when first
+     * initializing the PigStats.
+     *
+     * @return PigStats object.
+     */
+    public PigStats instantiatePigStats();
+
+    /**
      * Returns the ExecutableManager to be used in Pig Streaming.
      *
      * @return ExecutableManager to be used in Pig Streaming.

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecutionEngine.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRExecutionEngine.java Mon Nov 18 15:24:52 2013
@@ -22,8 +22,10 @@ import java.util.UUID;
 
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 public class MRExecutionEngine extends HExecutionEngine {
 
@@ -32,7 +34,15 @@ public class MRExecutionEngine extends H
         this.launcher = new MapReduceLauncher();
     }
 
+    @Override
     public ScriptState instantiateScriptState() {
-        return new MRScriptState(UUID.randomUUID().toString());
+        MRScriptState ss = new MRScriptState(UUID.randomUUID().toString());
+        ss.setPigContext(pigContext);
+        return ss;
+    }
+
+    @Override
+    public PigStats instantiatePigStats() {
+        return new SimplePigStats();
     }
 }

Modified: pig/trunk/src/org/apache/pig/scripting/BoundScript.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/BoundScript.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/BoundScript.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/BoundScript.java Mon Nov 18 15:24:52 2013
@@ -262,9 +262,10 @@ public class BoundScript {
 
     private PigStats exec(String query) throws IOException {
         LOG.info("Query to run:\n" + query);
-        List<PigProgressNotificationListener> listeners = ScriptState.get()
-                .getAllListeners();
-        ScriptState.start("embedded", scriptContext.getPigContext());
+        List<PigProgressNotificationListener> listeners = ScriptState.get().getAllListeners();
+        PigContext pc = scriptContext.getPigContext();
+        ScriptState scriptState = pc.getExecutionEngine().instantiateScriptState();
+        ScriptState.start(scriptState);
         ScriptState.get().setScript(query);
         for (PigProgressNotificationListener listener : listeners) {
             ScriptState.get().registerListener(listener);
@@ -329,8 +330,10 @@ public class BoundScript {
         
         @Override
         public PigStats call() throws Exception {
-            LOG.info("Query to run:\n" + query);          
-            ScriptState.start("embedded", scriptContext.getPigContext());
+            LOG.info("Query to run:\n" + query);
+            PigContext pc = scriptContext.getPigContext();
+            ScriptState scriptState = pc.getExecutionEngine().instantiateScriptState();
+            ScriptState.start(scriptState);
             ScriptState.get().setScript(query);
             ScriptState.get().registerListener(adaptor);
             PigServer pigServer = new PigServer(ctx, true);

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Mon Nov 18 15:24:52 2013
@@ -43,7 +43,6 @@ import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats.JobState;
-import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 import com.google.common.collect.Maps;
 
@@ -74,17 +73,9 @@ public abstract class PigStats {
     protected int returnCode = ReturnCode.UNKNOWN;
 
     public static PigStats get() {
-        if (tps.get() == null) {
-            LOG.info("PigStats has not been set. Defaulting to SimplePigStats");
-            tps.set(new SimplePigStats());
-        }
         return tps.get();
     }
 
-    static void set(PigStats stats) {
-        tps.set(stats);
-    }
-
     public static PigStats start(PigStats stats) {
         tps.set(stats);
         return tps.get();

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Nov 18 15:24:52 2013
@@ -161,8 +161,7 @@ public class PigStatsUtil {
     }
 
     public static void setStatsMap(Map<String, List<PigStats>> statsMap) {
-        EmbeddedPigStats stats = new EmbeddedPigStats(statsMap);
-        PigStats.set(stats);
+        PigStats.start(new EmbeddedPigStats(statsMap));
     }
 
 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Mon Nov 18 15:24:52 2013
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.jar.Attributes;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
@@ -34,7 +35,6 @@ import org.apache.commons.codec.binary.B
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -56,6 +56,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
 /**
  * ScriptStates encapsulates settings for a Pig script that runs on a hadoop
@@ -64,7 +65,7 @@ import org.apache.pig.newplan.logical.re
  * job xml, users who want to know the relations between the script and MR jobs
  * can derive them from the job xmls.
  */
-public class ScriptState {
+public abstract class ScriptState {
 
     /**
      * Keys of Pig settings added to Jobs
@@ -146,33 +147,32 @@ public class ScriptState {
 
     protected List<PigProgressNotificationListener> listeners = new ArrayList<PigProgressNotificationListener>();
 
-    public static ScriptState start(String commandLine, PigContext pigContext) {
-        ExecType execType = null;
-        if (pigContext == null || pigContext.getExecType() == null) {
-            execType = ExecType.MAPREDUCE;
-        } else {
-            execType = pigContext.getExecType();
-        }
-        ScriptState ss = execType.getExecutionEngine(pigContext)
-                .instantiateScriptState();
-        ss.setCommandLine(commandLine);
-        ss.setPigContext(pigContext);
-        tss.set(ss);
-        return ss;
-    }
-
     protected ScriptState(String id) {
         this.id = id;
         this.script = "";
     }
 
     public static ScriptState get() {
-        if (tss.get() == null) {
-            ScriptState.start("", null);
-        }
         return tss.get();
     }
 
+    public static ScriptState start(ScriptState state) {
+        tss.set(state);
+        return tss.get();
+    }
+
+    /**
+     * @deprecated use {@link org.apache.pig.tools.pigstats.ScriptState#start(ScriptState)} instead.
+     */
+    @Deprecated
+    public static ScriptState start(String commandLine, PigContext pigContext) {
+        ScriptState ss = new MRScriptState(UUID.randomUUID().toString());
+        ss.setCommandLine(commandLine);
+        ss.setPigContext(pigContext);
+        tss.set(ss);
+        return ss;
+    }
+
     public void registerListener(PigProgressNotificationListener listener) {
         listeners.add(listener);
     }
@@ -263,12 +263,12 @@ public class ScriptState {
         return id;
     }
 
-    protected void setCommandLine(String commandLine) {
+    public void setCommandLine(String commandLine) {
         this.commandLine = new String(Base64.encodeBase64(commandLine
                 .getBytes()));
     }
 
-    protected String getCommandLine() {
+    public String getCommandLine() {
         return (commandLine == null) ? "" : commandLine;
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Mon Nov 18 15:24:52 2013
@@ -33,7 +33,6 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Properties;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.JarOutputStream;
@@ -55,6 +54,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -109,7 +109,8 @@ public class TestJobControlCompiler {
     final String testUDFFileName = className+".class";
 
     // JobControlCompiler setup
-    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+    PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+    PigContext pigContext = pigServer.getPigContext();
     pigContext.connect();
     pigContext.addJar(tmpFile.getAbsolutePath());
     JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, CONF);
@@ -168,7 +169,8 @@ public class TestJobControlCompiler {
         zipArchives.add(textFile);
         final List<File> tarArchives = createFiles(".tgz", ".tar.gz", ".tar");
 
-        final PigContext pigContext = new PigContext();
+        final PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+        final PigContext pigContext = pigServer.getPigContext();
         pigContext.connect();
         pigContext.getProperties().put("pig.streaming.ship.files",
                 StringUtils.join(zipArchives, ","));

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Mon Nov 18 15:24:52 2013
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 
 import java.util.Iterator;
-import java.util.Properties;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -39,11 +39,13 @@ import org.junit.Test;
  * Test POPartialAgg runtime
  */
 public class TestPOPartialAggPlan  {
-    PigContext pc;
+    private static PigContext pc;
+    private static PigServer ps;
 
     @Before
     public void setUp() throws ExecException {
-        pc = new PigContext(ExecType.LOCAL, new Properties());
+        ps = new PigServer(ExecType.LOCAL);
+        pc = ps.getPigContext();
         pc.connect();
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPlanGeneration.java Mon Nov 18 15:24:52 2013
@@ -20,12 +20,12 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
-import java.util.Properties;
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -57,11 +57,13 @@ import org.junit.Test;
 
 public class TestPlanGeneration {
 
-    static PigContext pc;
+    private static PigContext pc;
+    private static PigServer ps;
 
     @BeforeClass
     public static void setUp() throws ExecException {
-        pc = new PigContext(ExecType.LOCAL, new Properties());
+        ps = new PigServer(ExecType.LOCAL);
+        pc = ps.getPigContext();
         pc.connect();
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java?rev=1543058&r1=1543057&r2=1543058&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestScriptLanguage.java Mon Nov 18 15:24:52 2013
@@ -38,14 +38,12 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestScriptLanguage {
 
     static MiniCluster cluster = MiniCluster.buildCluster();
-    private PigServer pigServer;
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
@@ -57,11 +55,6 @@ public class TestScriptLanguage {
         cluster.shutDown();
     }
 
-    @Before
-    public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-    }
-
     @After
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, "simple_out");
@@ -78,6 +71,7 @@ public class TestScriptLanguage {
         String scriptName = name + "_testScript.py";
         Util.createLocalInputFile(scriptName, script);
         ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptName);
         return statsMap;
     }