You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 18:46:55 UTC
[2/4] incubator-beam git commit: Use AutoValue for ExecutorUpdate
Use AutoValue for ExecutorUpdate
Explicitly provide the collections the Bundle should be consumed by in
the update.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59cca8dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59cca8dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59cca8dd
Branch: refs/heads/master
Commit: 59cca8ddae3d544beea9684719409efe3acbe634
Parents: e7df160
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 10:33:32 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 10:55:53 2016 -0700
----------------------------------------------------------------------
.../direct/ExecutorServiceParallelExecutor.java | 59 ++++++++++----------
1 file changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59cca8dd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6f26b6b..fd4cc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
+import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
@@ -191,8 +192,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
return keyedPValues.contains(pvalue);
}
- private void scheduleConsumers(CommittedBundle<?> bundle) {
- for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+ private void scheduleConsumers(ExecutorUpdate update) {
+ CommittedBundle<?> bundle = update.getBundle().get();
+ for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
scheduleConsumption(consumer, bundle, defaultCompletionCallback);
}
}
@@ -225,7 +227,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
CommittedResult committedResult = getCommittedResult(inputBundle, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
- allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+ allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
+ valueToConsumers.get(outputBundle.getPCollection())));
}
return committedResult;
}
@@ -276,38 +279,36 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
*
* Used to signal when the executor should be shut down (due to an exception).
*/
- private static class ExecutorUpdate {
- private final Optional<? extends CommittedBundle<?>> bundle;
- private final Optional<? extends Throwable> throwable;
-
- public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
- return new ExecutorUpdate(bundle, null);
+ @AutoValue
+ abstract static class ExecutorUpdate {
+ public static ExecutorUpdate fromBundle(
+ CommittedBundle<?> bundle,
+ Collection<AppliedPTransform<?, ?, ?>> consumers) {
+ return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+ Optional.of(bundle),
+ consumers,
+ Optional.<Throwable>absent());
}
public static ExecutorUpdate fromThrowable(Throwable t) {
- return new ExecutorUpdate(null, t);
- }
-
- private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
- this.bundle = Optional.fromNullable(producedBundle);
- this.throwable = Optional.fromNullable(throwable);
+ return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+ Optional.<CommittedBundle<?>>absent(),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ Optional.of(t));
}
- public Optional<? extends CommittedBundle<?>> getBundle() {
- return bundle;
- }
+ /**
+ * Returns the bundle that produced this update.
+ */
+ public abstract Optional<? extends CommittedBundle<?>> getBundle();
- public Optional<? extends Throwable> getException() {
- return throwable;
- }
+ /**
+ * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return
+ * a present {@link Optional}.
+ */
+ public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(ExecutorUpdate.class)
- .add("bundle", bundle)
- .add("exception", throwable)
- .toString();
- }
+ public abstract Optional<? extends Throwable> getException();
}
/**
@@ -353,7 +354,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
while (update != null) {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
- scheduleConsumers(update.getBundle().get());
+ scheduleConsumers(update);
} else if (update.getException().isPresent()) {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}