You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/12 00:53:07 UTC

[1/2] incubator-beam git commit: Fix FindBugs Errors in the Direct Runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master f0f4af581 -> fe17ef7f8


Fix FindBugs Errors in the Direct Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa4e4ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa4e4ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa4e4ec

Branch: refs/heads/master
Commit: bfa4e4ece0091efc635c5c62c3eaf955f597fa39
Parents: f0f4af5
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 9 13:24:19 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 16:49:09 2016 -0800

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     | 13 -------
 .../runners/direct/AggregatorContainer.java     |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java | 41 ++++++++++++--------
 .../direct/WatermarkCallbackExecutor.java       |  4 +-
 .../beam/runners/direct/WatermarkManager.java   |  4 +-
 5 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 43cf3c0..8983b1c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -40,19 +40,6 @@
       </resource>
     </resources>
 
-    <pluginManagement>
-      <plugins>
-        <!-- BEAM-924 -->
-        <plugin>
-          <groupId>org.codehaus.mojo</groupId>
-          <artifactId>findbugs-maven-plugin</artifactId>
-          <configuration>
-            <skip>true</skip>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index 7b6bc64..e86bc3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -43,7 +43,7 @@ public class AggregatorContainer {
     private final String name;
     private final CombineFn<InputT, AccumT, OutputT> combiner;
     @GuardedBy("this")
-    private AccumT accumulator = null;
+    private volatile AccumT accumulator = null;
     private boolean committed = false;
 
     private AggregatorInfo(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index d1ffea1..30fc417 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
@@ -30,12 +32,12 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -142,7 +144,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
         CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
 
     this.allUpdates = new ConcurrentLinkedQueue<>();
-    this.visibleUpdates = new ArrayBlockingQueue<>(20);
+    this.visibleUpdates = new LinkedBlockingQueue<>();
 
     parallelExecutorService = TransformExecutorServices.parallel(executorService);
     defaultCompletionCallback =
@@ -180,7 +182,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   @SuppressWarnings("unchecked")
   public void scheduleConsumption(
       AppliedPTransform<?, ?, ?> consumer,
-      @Nullable CommittedBundle<?> bundle,
+      CommittedBundle<?> bundle,
       CompletionCallback onComplete) {
     evaluateBundle(consumer, bundle, onComplete);
   }
@@ -399,19 +401,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
           pendingUpdate = allUpdates.poll();
         }
         for (ExecutorUpdate update : updates) {
-          LOG.debug("Executor Update: {}", update);
-          if (update.getBundle().isPresent()) {
-            if (ExecutorState.ACTIVE == startingState
-                || (ExecutorState.PROCESSING == startingState
-                    && noWorkOutstanding)) {
-              scheduleConsumers(update);
-            } else {
-              allUpdates.offer(update);
-            }
-          } else if (update.getException().isPresent()) {
-            visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get()));
-            exceptionThrown = true;
-          }
+          applyUpdate(noWorkOutstanding, startingState, update);
         }
         addWorkIfNecessary();
       } catch (InterruptedException e) {
@@ -434,6 +424,25 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       }
     }
 
+    private void applyUpdate(
+        boolean noWorkOutstanding, ExecutorState startingState, ExecutorUpdate update) {
+      LOG.debug("Executor Update: {}", update);
+      if (update.getBundle().isPresent()) {
+        if (ExecutorState.ACTIVE == startingState
+            || (ExecutorState.PROCESSING == startingState
+                && noWorkOutstanding)) {
+          scheduleConsumers(update);
+        } else {
+          allUpdates.offer(update);
+        }
+      } else if (update.getException().isPresent()) {
+        checkState(
+            visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())),
+            "VisibleUpdates should always be able to receive an offered update");
+        exceptionThrown = true;
+      }
+    }
+
     /**
      * Fires any available timers.
      */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index c8bf912..54cab7c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Ordering;
+import java.io.Serializable;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -129,7 +130,8 @@ class WatermarkCallbackExecutor {
     }
   }
 
-  private static class CallbackOrdering extends Ordering<WatermarkCallback> {
+  private static class CallbackOrdering extends Ordering<WatermarkCallback>
+      implements Serializable {
     @Override
     public int compare(WatermarkCallback left, WatermarkCallback right) {
       return ComparisonChain.start()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index fe2c2e5..a53c11c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
 import com.google.common.collect.TreeMultiset;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1433,7 +1434,8 @@ public class WatermarkManager {
     }
   }
 
-  private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
+  private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>>
+      implements Serializable {
     @Override
     public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
       return ComparisonChain.start()


[2/2] incubator-beam git commit: This closes #1325

Posted by tg...@apache.org.
This closes #1325


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fe17ef7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fe17ef7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fe17ef7f

Branch: refs/heads/master
Commit: fe17ef7f8f5a8037d0aeae8841b424cb76e86dcb
Parents: f0f4af5 bfa4e4e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 11 16:51:37 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 16:51:37 2016 -0800

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     | 13 -------
 .../runners/direct/AggregatorContainer.java     |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java | 41 ++++++++++++--------
 .../direct/WatermarkCallbackExecutor.java       |  4 +-
 .../beam/runners/direct/WatermarkManager.java   |  4 +-
 5 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------