You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/12/27 05:49:15 UTC
svn commit: r1426120 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperat...
Author: cheolsoo
Date: Thu Dec 27 04:49:15 2012
New Revision: 1426120
URL: http://svn.apache.org/viewvc?rev=1426120&view=rev
Log:
PIG-3050: Fix FindBugs multithreading warnings (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 27 04:49:15 2012
@@ -64,6 +64,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3050: Fix FindBugs multithreading warnings (cheolsoo)
+
PIG-3066: Fix TestPigRunner in trunk (cheolsoo)
PIG-3101: Increase io.sort.mb in YARN MiniCluste (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Dec 27 04:49:15 2012
@@ -51,7 +51,7 @@ public final class PigHadoopLogger imple
public void warn(Object o, String msg, Enum warningEnum) {
String displayMessage = o.getClass().getName() + ": " + msg;
- if (aggregate) {
+ if (getAggregate()) {
if (reporter != null) {
reporter.getCounter(warningEnum).increment(1);
} else {
@@ -74,7 +74,7 @@ public final class PigHadoopLogger imple
this.reporter = rep;
}
- public boolean getAggregate() {
+ public synchronized boolean getAggregate() {
return aggregate;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Dec 27 04:49:15 2012
@@ -102,7 +102,7 @@ public abstract class PhysicalOperator e
// Will be used by operators to report status or transmit heartbeat
// Should be set by the backends to appropriate implementations that
// wrap their own version of a reporter.
- public static PigProgressable reporter;
+ private static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
// Will be used by operators to aggregate warning messages
// Should be set by the backends to appropriate implementations that
@@ -300,8 +300,8 @@ public abstract class PhysicalOperator e
}
//Should be removed once the model is clear
- if(reporter!=null) {
- reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
}
if (!isInputAttached()) {
@@ -455,8 +455,18 @@ public abstract class PhysicalOperator e
public void reset() {
}
+ /**
+ * @return PigProgressable stored in threadlocal
+ */
+ public static PigProgressable getReporter() {
+ return PhysicalOperator.reporter.get();
+ }
+
+ /**
+ * @param reporter PigProgressable to be stored in threadlocal
+ */
public static void setReporter(PigProgressable reporter) {
- PhysicalOperator.reporter = reporter;
+ PhysicalOperator.reporter.set(reporter);
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Thu Dec 27 04:49:15 2012
@@ -574,7 +574,9 @@ public class POProject extends Expressio
}
//Should be removed once the model is clear
- if(reporter!=null) reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
if(!isInputAttached()) {
if (inputs.get(0).getResultType()==DataType.BAG)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Thu Dec 27 04:49:15 2012
@@ -70,7 +70,7 @@ public class POUserComparisonFunc extend
private void instantiateFunc() {
this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
- this.func.setReporter(reporter);
+ this.func.setReporter(getReporter());
}
public ComparisonFunc getComparator() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Dec 27 04:49:15 2012
@@ -145,7 +145,7 @@ public class POUserFunc extends Expressi
//making the initializations here basically useless. Look at the processInput
//method where these variables are re-initialized. At that point, the PhysicalOperator
//is set up correctly with the reporter and pigLogger references
- this.func.setReporter(reporter);
+ this.func.setReporter(getReporter());
this.func.setPigLogger(pigLogger);
}
@@ -160,7 +160,7 @@ public class POUserFunc extends Expressi
// cheap to call the setReporter call everytime as to check whether I
// have (hopefully java will inline it).
if(!initialized) {
- func.setReporter(reporter);
+ func.setReporter(getReporter());
func.setPigLogger(pigLogger);
Configuration jobConf = UDFContext.getUDFContext().getJobConf();
if (jobConf != null) {
@@ -205,8 +205,8 @@ public class POUserFunc extends Expressi
}
//Should be removed once the model is clear
- if(reporter!=null) {
- reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Thu Dec 27 04:49:15 2012
@@ -396,8 +396,8 @@ public class POFRJoin extends PhysicalOp
log.debug("Completed setup. Trying to build replication hash table");
for (Result res = lr.getNext(dummyTuple);res.returnStatus != POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
- if (reporter != null)
- reporter.progress();
+ if (getReporter() != null)
+ getReporter().progress();
Tuple tuple = (Tuple) res.result;
if (isKeyNull(tuple.get(1))) continue;
Tuple key = mTupleFactory.newTuple(1);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Dec 27 04:49:15 2012
@@ -452,8 +452,8 @@ public class POForEach extends PhysicalO
}
}
- if(reporter!=null) {
- reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
}
//createTuple(data);
res.result = createTuple(data);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java Thu Dec 27 04:49:15 2012
@@ -172,7 +172,9 @@ public class POJoinPackage extends POPac
lastInputTuple = true;
break;
}
- if(reporter!=null) reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
}
// If we don't have any tuple for input#n
// we do not need any further process, return EOP
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Dec 27 04:49:15 2012
@@ -278,7 +278,9 @@ public class POPackage extends PhysicalO
} else {
dbs[index].add(copy);
}
- if(reporter!=null) reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Thu Dec 27 04:49:15 2012
@@ -168,7 +168,9 @@ public class POPackageLite extends POPac
//Create numInputs bags
ReadOnceBag db = null;
db = new ReadOnceBag(this, tupIter, key);
- if(reporter!=null) reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
//Construct the output tuple by appending
//the key and all the above constructed bags
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Dec 27 04:49:15 2012
@@ -52,8 +52,8 @@ public class POStream extends PhysicalOp
private StreamingCommand command; // Actual command to be run
private Properties properties;
- protected boolean initialized = false;
-
+ private boolean initialized = false;
+
protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
@@ -178,7 +178,7 @@ public class POStream extends PhysicalOp
// in the course of the map/reduce - if we did
// then "initialized" will be true. If not, just
// send EOP down.
- if(initialized) {
+ if(getInitialized()) {
// signal End of ALL input to the Executable Manager's
// Input handler thread
binaryInputQueue.put(r);
@@ -242,6 +242,14 @@ public class POStream extends PhysicalOp
}
+ public synchronized boolean getInitialized() {
+ return initialized;
+ }
+
+ public synchronized void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
public Result getNextHelper(Tuple t) throws ExecException {
try {
synchronized(this) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Thu Dec 27 04:49:15 2012
@@ -161,7 +161,9 @@ public class POUnion extends PhysicalOpe
Result res;
while(true){
- if(reporter!=null) reporter.progress();
+ if(getReporter()!=null) {
+ getReporter().progress();
+ }
res = inputs.get(ind).getNext(t);
lastInd = ind + 1;
Modified: pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Thu Dec 27 04:49:15 2012
@@ -396,8 +396,8 @@ public abstract class DefaultAbstractBag
* Report progress to HDFS.
*/
protected void reportProgress() {
- if (PhysicalOperator.reporter != null) {
- PhysicalOperator.reporter.progress();
+ if (PhysicalOperator.getReporter() != null) {
+ PhysicalOperator.getReporter().progress();
}
}
Modified: pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Thu Dec 27 04:49:15 2012
@@ -114,8 +114,8 @@ public class NonSpillableDataBag impleme
* Report progress to HDFS.
*/
protected void reportProgress() {
- if (PhysicalOperator.reporter != null) {
- PhysicalOperator.reporter.progress();
+ if (PhysicalOperator.getReporter() != null) {
+ PhysicalOperator.getReporter().progress();
}
}
Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Thu Dec 27 04:49:15 2012
@@ -274,9 +274,10 @@ public class SchemaTupleBackend {
} else {
SchemaTupleFrontend.lazyReset(pigContext);
SchemaTupleFrontend.reset();
- stb = new SchemaTupleBackend(jConf, isLocal);
- stb.copyAndResolve();
- }
+ SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
+ stbInstance.copyAndResolve();
+ stb = stbInstance;
+ }
}
public static SchemaTupleFactory newSchemaTupleFactory(Schema s, boolean isAppendable, GenContext context) {
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1426120&r1=1426119&r2=1426120&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Thu Dec 27 04:49:15 2012
@@ -78,7 +78,7 @@ public class SpillableMemoryManager impl
// log notification on collection threshold exceeded only the first time
private boolean firstCollectionThreshExceededLogged = false;
- private static SpillableMemoryManager manager;
+ private static volatile SpillableMemoryManager manager;
private SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);