You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/12/27 05:49:15 UTC

svn commit: r1426120 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperat...

Author: cheolsoo
Date: Thu Dec 27 04:49:15 2012
New Revision: 1426120

URL: http://svn.apache.org/viewvc?rev=1426120&view=rev
Log:
PIG-3050: Fix FindBugs multithreading warnings (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
    pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
    pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 27 04:49:15 2012
@@ -64,6 +64,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3050: Fix FindBugs multithreading warnings (cheolsoo)
+
 PIG-3066: Fix TestPigRunner in trunk (cheolsoo)
 
 PIG-3101: Increase io.sort.mb in YARN MiniCluste (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Dec 27 04:49:15 2012
@@ -51,7 +51,7 @@ public final class PigHadoopLogger imple
     public void warn(Object o, String msg, Enum warningEnum) {
         String displayMessage = o.getClass().getName() + ": " + msg;
         
-        if (aggregate) {
+        if (getAggregate()) {
             if (reporter != null) {
                 reporter.getCounter(warningEnum).increment(1);
             } else {
@@ -74,7 +74,7 @@ public final class PigHadoopLogger imple
         this.reporter = rep;
     }
     
-    public boolean getAggregate() {
+    public synchronized boolean getAggregate() {
         return aggregate;
     }
     

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Dec 27 04:49:15 2012
@@ -102,7 +102,7 @@ public abstract class PhysicalOperator e
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
-    public static PigProgressable reporter;
+    private static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
 
     // Will be used by operators to aggregate warning messages
     // Should be set by the backends to appropriate implementations that
@@ -300,8 +300,8 @@ public abstract class PhysicalOperator e
         }
 
         //Should be removed once the model is clear
-        if(reporter!=null) {
-            reporter.progress();
+        if(getReporter()!=null) {
+            getReporter().progress();
         }
 
         if (!isInputAttached()) {
@@ -455,8 +455,18 @@ public abstract class PhysicalOperator e
     public void reset() {
     }
 
+    /**
+     * @return PigProgressable stored in threadlocal
+     */
+    public static PigProgressable getReporter() {
+        return PhysicalOperator.reporter.get();
+    }
+
+    /**
+     * @param reporter PigProgressable to be stored in threadlocal
+     */
     public static void setReporter(PigProgressable reporter) {
-        PhysicalOperator.reporter = reporter;
+        PhysicalOperator.reporter.set(reporter);
     }
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Thu Dec 27 04:49:15 2012
@@ -574,7 +574,9 @@ public class POProject extends Expressio
         }
         
         //Should be removed once the model is clear
-        if(reporter!=null) reporter.progress();
+        if(getReporter()!=null) {
+            getReporter().progress();
+        }
         
         if(!isInputAttached()) {
             if (inputs.get(0).getResultType()==DataType.BAG)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Dec 27 04:49:15 2012
@@ -70,7 +70,7 @@ public class POUserComparisonFunc extend
     
     private void instantiateFunc() {
         this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
-        this.func.setReporter(reporter);
+        this.func.setReporter(getReporter());
     }
     
     public ComparisonFunc getComparator() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Dec 27 04:49:15 2012
@@ -145,7 +145,7 @@ public class POUserFunc extends Expressi
         //making the initializations here basically useless. Look at the processInput
         //method where these variables are re-initialized. At that point, the PhysicalOperator
         //is set up correctly with the reporter and pigLogger references
-        this.func.setReporter(reporter);
+        this.func.setReporter(getReporter());
         this.func.setPigLogger(pigLogger);
     }
 
@@ -160,7 +160,7 @@ public class POUserFunc extends Expressi
         // cheap to call the setReporter call everytime as to check whether I
         // have (hopefully java will inline it).
         if(!initialized) {
-            func.setReporter(reporter);
+            func.setReporter(getReporter());
             func.setPigLogger(pigLogger);
             Configuration jobConf = UDFContext.getUDFContext().getJobConf();
             if (jobConf != null) {
@@ -205,8 +205,8 @@ public class POUserFunc extends Expressi
         }
 
         //Should be removed once the model is clear
-        if(reporter!=null) {
-            reporter.progress();
+        if(getReporter()!=null) {
+            getReporter().progress();
         }
 
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Thu Dec 27 04:49:15 2012
@@ -396,8 +396,8 @@ public class POFRJoin extends PhysicalOp
 
             log.debug("Completed setup. Trying to build replication hash table");
             for (Result res = lr.getNext(dummyTuple);res.returnStatus != POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
-                if (reporter != null)
-                    reporter.progress();               
+                if (getReporter() != null)
+                    getReporter().progress();
                 Tuple tuple = (Tuple) res.result;
                 if (isKeyNull(tuple.get(1))) continue;
                 Tuple key = mTupleFactory.newTuple(1);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Dec 27 04:49:15 2012
@@ -452,8 +452,8 @@ public class POForEach extends PhysicalO
                     }
 
                 }
-                if(reporter!=null) {
-                    reporter.progress();
+                if(getReporter()!=null) {
+                    getReporter().progress();
                 }
                 //createTuple(data);
                 res.result = createTuple(data);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Thu Dec 27 04:49:15 2012
@@ -172,7 +172,9 @@ public class POJoinPackage extends POPac
                     lastInputTuple = true;
                     break;
                 }
-                if(reporter!=null) reporter.progress();
+                if(getReporter()!=null) {
+                    getReporter().progress();
+                }
             }
             // If we don't have any tuple for input#n
             // we do not need any further process, return EOP

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Dec 27 04:49:15 2012
@@ -278,7 +278,9 @@ public class POPackage extends PhysicalO
                     } else {
                         dbs[index].add(copy);
                     }
-                    if(reporter!=null) reporter.progress();
+                    if(getReporter()!=null) {
+                        getReporter().progress();
+                    }
                 }
             }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Thu Dec 27 04:49:15 2012
@@ -168,7 +168,9 @@ public class POPackageLite extends POPac
         //Create numInputs bags
         ReadOnceBag db = null;
         db = new ReadOnceBag(this, tupIter, key);
-        if(reporter!=null) reporter.progress();
+        if(getReporter()!=null) {
+            getReporter().progress();
+        }
         
         //Construct the output tuple by appending
         //the key and all the above constructed bags

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Dec 27 04:49:15 2012
@@ -52,8 +52,8 @@ public class POStream extends PhysicalOp
     private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
-    protected boolean initialized = false;
-    
+    private boolean initialized = false;
+
     protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
 
     protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
@@ -178,7 +178,7 @@ public class POStream extends PhysicalOp
                     // in the course of the map/reduce - if we did
                     // then "initialized" will be true. If not, just
                     // send EOP down.
-                    if(initialized) {
+                    if(getInitialized()) {
                         // signal End of ALL input to the Executable Manager's 
                         // Input handler thread
                         binaryInputQueue.put(r);
@@ -242,6 +242,14 @@ public class POStream extends PhysicalOp
         
     }
 
+    public synchronized boolean getInitialized() {
+        return initialized;
+    }
+
+    public synchronized void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+
     public Result getNextHelper(Tuple t) throws ExecException {
         try {
             synchronized(this) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Thu Dec 27 04:49:15 2012
@@ -161,7 +161,9 @@ public class POUnion extends PhysicalOpe
                 Result res;
 
                 while(true){
-                    if(reporter!=null) reporter.progress();
+                    if(getReporter()!=null) {
+                        getReporter().progress();
+                    }
                     res = inputs.get(ind).getNext(t);
                     lastInd = ind + 1;
 

Modified: pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 27 04:49:15 2012
@@ -396,8 +396,8 @@ public abstract class DefaultAbstractBag
      * Report progress to HDFS.
      */
     protected void reportProgress() {
-        if (PhysicalOperator.reporter != null) {
-            PhysicalOperator.reporter.progress();
+        if (PhysicalOperator.getReporter() != null) {
+            PhysicalOperator.getReporter().progress();
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Thu Dec 27 04:49:15 2012
@@ -114,8 +114,8 @@ public class NonSpillableDataBag impleme
      * Report progress to HDFS.
      */
     protected void reportProgress() {
-        if (PhysicalOperator.reporter != null) {
-            PhysicalOperator.reporter.progress();
+        if (PhysicalOperator.getReporter() != null) {
+            PhysicalOperator.getReporter().progress();
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Thu Dec 27 04:49:15 2012
@@ -274,9 +274,10 @@ public class SchemaTupleBackend {
         } else {
             SchemaTupleFrontend.lazyReset(pigContext);
             SchemaTupleFrontend.reset();
-            stb = new SchemaTupleBackend(jConf, isLocal);
-        stb.copyAndResolve();
-    }
+            SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
+            stbInstance.copyAndResolve();
+            stb = stbInstance;
+        }
     }
 
     public static SchemaTupleFactory newSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context)  {

Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Thu Dec 27 04:49:15 2012
@@ -78,7 +78,7 @@ public class SpillableMemoryManager impl
     // log notification on collection threshold exceeded only the first time
     private boolean firstCollectionThreshExceededLogged = false;
     
-    private static SpillableMemoryManager manager;
+    private static volatile SpillableMemoryManager manager;
 
     private SpillableMemoryManager() {
         ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);