You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/12 00:53:07 UTC
[1/2] incubator-beam git commit: Fix FindBugs Errors in the Direct
Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master f0f4af581 -> fe17ef7f8
Fix FindBugs Errors in the Direct Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfa4e4ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfa4e4ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfa4e4ec
Branch: refs/heads/master
Commit: bfa4e4ece0091efc635c5c62c3eaf955f597fa39
Parents: f0f4af5
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 9 13:24:19 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 16:49:09 2016 -0800
----------------------------------------------------------------------
runners/direct-java/pom.xml | 13 -------
.../runners/direct/AggregatorContainer.java | 2 +-
.../direct/ExecutorServiceParallelExecutor.java | 41 ++++++++++++--------
.../direct/WatermarkCallbackExecutor.java | 4 +-
.../beam/runners/direct/WatermarkManager.java | 4 +-
5 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 43cf3c0..8983b1c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -40,19 +40,6 @@
</resource>
</resources>
- <pluginManagement>
- <plugins>
- <!-- BEAM-924 -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index 7b6bc64..e86bc3e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -43,7 +43,7 @@ public class AggregatorContainer {
private final String name;
private final CombineFn<InputT, AccumT, OutputT> combiner;
@GuardedBy("this")
- private AccumT accumulator = null;
+ private volatile AccumT accumulator = null;
private boolean committed = false;
private AggregatorInfo(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index d1ffea1..30fc417 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -30,12 +32,12 @@ import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -142,7 +144,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
this.allUpdates = new ConcurrentLinkedQueue<>();
- this.visibleUpdates = new ArrayBlockingQueue<>(20);
+ this.visibleUpdates = new LinkedBlockingQueue<>();
parallelExecutorService = TransformExecutorServices.parallel(executorService);
defaultCompletionCallback =
@@ -180,7 +182,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
@SuppressWarnings("unchecked")
public void scheduleConsumption(
AppliedPTransform<?, ?, ?> consumer,
- @Nullable CommittedBundle<?> bundle,
+ CommittedBundle<?> bundle,
CompletionCallback onComplete) {
evaluateBundle(consumer, bundle, onComplete);
}
@@ -399,19 +401,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
pendingUpdate = allUpdates.poll();
}
for (ExecutorUpdate update : updates) {
- LOG.debug("Executor Update: {}", update);
- if (update.getBundle().isPresent()) {
- if (ExecutorState.ACTIVE == startingState
- || (ExecutorState.PROCESSING == startingState
- && noWorkOutstanding)) {
- scheduleConsumers(update);
- } else {
- allUpdates.offer(update);
- }
- } else if (update.getException().isPresent()) {
- visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get()));
- exceptionThrown = true;
- }
+ applyUpdate(noWorkOutstanding, startingState, update);
}
addWorkIfNecessary();
} catch (InterruptedException e) {
@@ -434,6 +424,25 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
}
}
+ private void applyUpdate(
+ boolean noWorkOutstanding, ExecutorState startingState, ExecutorUpdate update) {
+ LOG.debug("Executor Update: {}", update);
+ if (update.getBundle().isPresent()) {
+ if (ExecutorState.ACTIVE == startingState
+ || (ExecutorState.PROCESSING == startingState
+ && noWorkOutstanding)) {
+ scheduleConsumers(update);
+ } else {
+ allUpdates.offer(update);
+ }
+ } else if (update.getException().isPresent()) {
+ checkState(
+ visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())),
+ "VisibleUpdates should always be able to receive an offered update");
+ exceptionThrown = true;
+ }
+ }
+
/**
* Fires any available timers.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index c8bf912..54cab7c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Ordering;
+import java.io.Serializable;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -129,7 +130,8 @@ class WatermarkCallbackExecutor {
}
}
- private static class CallbackOrdering extends Ordering<WatermarkCallback> {
+ private static class CallbackOrdering extends Ordering<WatermarkCallback>
+ implements Serializable {
@Override
public int compare(WatermarkCallback left, WatermarkCallback right) {
return ComparisonChain.start()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfa4e4ec/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index fe2c2e5..a53c11c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.SortedMultiset;
import com.google.common.collect.TreeMultiset;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1433,7 +1434,8 @@ public class WatermarkManager {
}
}
- private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>> {
+ private static class BundleByElementTimestampComparator extends Ordering<CommittedBundle<?>>
+ implements Serializable {
@Override
public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
return ComparisonChain.start()
[2/2] incubator-beam git commit: This closes #1325
Posted by tg...@apache.org.
This closes #1325
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fe17ef7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fe17ef7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fe17ef7f
Branch: refs/heads/master
Commit: fe17ef7f8f5a8037d0aeae8841b424cb76e86dcb
Parents: f0f4af5 bfa4e4e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Nov 11 16:51:37 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 11 16:51:37 2016 -0800
----------------------------------------------------------------------
runners/direct-java/pom.xml | 13 -------
.../runners/direct/AggregatorContainer.java | 2 +-
.../direct/ExecutorServiceParallelExecutor.java | 41 ++++++++++++--------
.../direct/WatermarkCallbackExecutor.java | 4 +-
.../beam/runners/direct/WatermarkManager.java | 4 +-
5 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------