You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/12/20 17:31:07 UTC

svn commit: r1424575 - in /pig/trunk: CHANGES.txt test/org/apache/pig/pigunit/PigTest.java

Author: cheolsoo
Date: Thu Dec 20 16:31:07 2012
New Revision: 1424575

URL: http://svn.apache.org/viewvc?rev=1424575&view=rev
Log:
PIG-3096: Make PigUnit thread safe (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/test/org/apache/pig/pigunit/PigTest.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1424575&r1=1424574&r2=1424575&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 20 16:31:07 2012
@@ -64,6 +64,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3096: Make PigUnit thread safe (cheolsoo)
+
 PIG-3095: "which" is called many, many times for each Pig STREAM statement (nwhite via cheolsoo)
 
 PIG-3085: Errors and lacks in document "Built In Functions" (miyakawataku via cheolsoo)

Modified: pig/trunk/test/org/apache/pig/pigunit/PigTest.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/pigunit/PigTest.java?rev=1424575&r1=1424574&r2=1424575&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/pigunit/PigTest.java (original)
+++ pig/trunk/test/org/apache/pig/pigunit/PigTest.java Thu Dec 20 16:31:07 2012
@@ -34,6 +34,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.pigunit.pig.PigServer;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -57,8 +58,9 @@ public class PigTest {
   /** The list of aliases to override in the script. */
   private final Map<String, String> aliasOverrides;
 
-  private static PigServer pig;
-  private static Cluster cluster;
+  private static ThreadLocal<PigServer> pig = new ThreadLocal<PigServer>();
+  private static ThreadLocal<Cluster> cluster = new ThreadLocal<Cluster>();
+
   private static final Logger LOG = Logger.getLogger(PigTest.class);
   private static final String EXEC_CLUSTER = "pigunit.exectype.cluster";
 
@@ -107,30 +109,39 @@ public class PigTest {
   public PigTest(String scriptPath, String[] args, PigServer pig, Cluster cluster)
       throws IOException {
     this(args, null, readFile(scriptPath));
-    PigTest.pig = pig;
-    PigTest.cluster = cluster;
+    PigTest.pig.set(pig);
+    PigTest.cluster.set(cluster);
   }
 
   /**
    * Connects and starts if needed the PigServer.
    *
-   * @return The cluster where input files can be copied.
+   * @return Reference to the Cluster in ThreadLocal.
    * @throws ExecException If the PigServer can't be started.
    */
   public static Cluster getCluster() throws ExecException {
-    if (cluster == null) {
+    if (cluster.get() == null) {
       if (System.getProperties().containsKey(EXEC_CLUSTER)) {
         LOG.info("Using cluster mode");
-        pig = new PigServer(ExecType.MAPREDUCE);
+        pig.set(new PigServer(ExecType.MAPREDUCE));
       } else {
         LOG.info("Using default local mode");
-        pig = new PigServer(ExecType.LOCAL);
+        pig.set(new PigServer(ExecType.LOCAL));
       }
 
-      cluster = new Cluster(pig.getPigContext());
+      cluster.set(new Cluster(pig.get().getPigContext()));
     }
 
-    return cluster;
+    return cluster.get();
+  }
+
+  /**
+   * Return the PigServer.
+   *
+   * @return Reference to the PigServer in ThreadLocal.
+   */
+  public static PigServer getPigServer() {
+    return pig.get();
   }
 
   /**
@@ -140,7 +151,7 @@ public class PigTest {
    * @throws ParseException The pig script could not have all its variables substituted.
    */
   protected void registerScript() throws IOException, ParseException {
-    PigTest.getCluster();
+    getCluster();
 
     BufferedReader pigIStream = new BufferedReader(new StringReader(this.originalTextPigScript));
     StringWriter pigOStream = new StringWriter();
@@ -157,7 +168,7 @@ public class PigTest {
     pw.close();
 
     String pigSubstitutedFile = f.getCanonicalPath();
-    pig.registerScript(pigSubstitutedFile, aliasOverrides);
+    getPigServer().registerScript(pigSubstitutedFile, aliasOverrides);
   }
 
   /**
@@ -180,7 +191,7 @@ public class PigTest {
    */
   public Iterator<Tuple> getAlias(String alias) throws IOException, ParseException {
     registerScript();
-    return pig.openIterator(alias);
+    return getPigServer().openIterator(alias);
   }
 
   /**
@@ -251,10 +262,10 @@ public class PigTest {
     registerScript();
 
     StringBuilder sb = new StringBuilder();
-    Schema.stringifySchema(sb, pig.dumpSchema(aliasInput), DataType.TUPLE) ;
+    Schema.stringifySchema(sb, getPigServer().dumpSchema(aliasInput), DataType.TUPLE) ;
 
-    final String destination = "pigunit-input-overriden.txt";
-    cluster.copyFromLocalFile(input, destination, true);
+    final String destination = FileLocalizer.getTemporaryPath(getPigServer().getPigContext()).toString();
+    getCluster().copyFromLocalFile(input, destination, true);
     override(aliasInput,
         String.format("%s = LOAD '%s' AS %s;", aliasInput, destination, sb.toString()));