You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/05 17:40:30 UTC

[1/4] incubator-beam git commit: Remove unneccssary Assignment in TransformExecutor

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2b5c6bcb2 -> c584b37b8


Remove unneccssary Assignment in TransformExecutor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/acf71d31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/acf71d31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/acf71d31

Branch: refs/heads/master
Commit: acf71d313a43c8f39213ce19277ffebadcc40a77
Parents: 4546fd9
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:59:46 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/direct/TransformExecutor.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/acf71d31/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 3db941d..d873bf5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -118,7 +118,7 @@ class TransformExecutor<T> implements Runnable {
 
       processElements(evaluator, enforcements);
 
-      TransformResult result = finishBundle(evaluator, enforcements);
+      finishBundle(evaluator, enforcements);
     } catch (Throwable t) {
       onComplete.handleThrowable(inputBundle, t);
       if (t instanceof RuntimeException) {


[4/4] incubator-beam git commit: This closes #795

Posted by ke...@apache.org.
This closes #795


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c584b37b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c584b37b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c584b37b

Branch: refs/heads/master
Commit: c584b37b8ac4e863bad83a766b6871ccc7135334
Parents: 2b5c6bc acf71d3
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Aug 5 10:40:17 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Aug 5 10:40:17 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/EvaluationContext.java  | 14 ++++++---
 .../direct/ExecutorServiceParallelExecutor.java |  5 ++--
 .../beam/runners/direct/TransformExecutor.java  |  2 +-
 .../beam/runners/direct/DirectRunnerTest.java   | 31 ++++++++++++++++++++
 4 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Remove References to Instant#now in the DirectRunner

Posted by ke...@apache.org.
Remove References to Instant#now in the DirectRunner

The DirectRunner should use exclusively the configured clock to
determine the processing time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7585cfc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7585cfc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7585cfc3

Branch: refs/heads/master
Commit: 7585cfc3693800b00c4ccc799c27f0311e9b0cc1
Parents: fcf6b1d
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:05 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/EvaluationContext.java | 14 ++++++++++----
 .../direct/ExecutorServiceParallelExecutor.java       |  5 ++---
 2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 23c139d..94f28e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -48,6 +48,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import org.joda.time.Instant;
+
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -81,6 +83,7 @@ class EvaluationContext {
 
   /** The options that were used to create this {@link Pipeline}. */
   private final DirectOptions options;
+  private final Clock clock;
 
   private final BundleFactory bundleFactory;
   /** The current processing time and event time watermarks and timers. */
@@ -116,6 +119,7 @@ class EvaluationContext {
       Map<AppliedPTransform<?, ?, ?>, String> stepNames,
       Collection<PCollectionView<?>> views) {
     this.options = checkNotNull(options);
+    this.clock = options.getClock();
     this.bundleFactory = checkNotNull(bundleFactory);
     checkNotNull(rootTransforms);
     checkNotNull(valueToConsumers);
@@ -123,9 +127,7 @@ class EvaluationContext {
     checkNotNull(views);
     this.stepNames = stepNames;
 
-    this.watermarkManager =
-        WatermarkManager.create(
-            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+    this.watermarkManager = WatermarkManager.create(clock, rootTransforms, valueToConsumers);
     this.sideInputContainer = SideInputContainer.create(this, views);
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
@@ -314,7 +316,7 @@ class EvaluationContext {
       AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
     StepAndKey stepAndKey = StepAndKey.of(application, key);
     return new DirectExecutionContext(
-        options.getClock(),
+        clock,
         key,
         (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
         watermarkManager.getWatermarks(application));
@@ -427,4 +429,8 @@ class EvaluationContext {
     }
     return true;
   }
+
+  public Instant now() {
+    return clock.now();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7585cfc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 64836d8..a0a5ec0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -40,7 +40,6 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
-import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -433,9 +432,9 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
                       .createKeyedBundle(
                           null, keyTimers.getKey(), (PCollection) transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
-                      .commit(Instant.now());
-              state.set(ExecutorState.ACTIVE);
+                      .commit(evaluationContext.now());
               scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery));
+              state.set(ExecutorState.ACTIVE);
             }
           }
         }


[2/4] incubator-beam git commit: Add DirectRunner Reuse Test

Posted by ke...@apache.org.
Add DirectRunner Reuse Test

Two calls to run using the Direct Runner should be independent and
succeed independently.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4546fd9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4546fd9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4546fd9c

Branch: refs/heads/master
Commit: 4546fd9c5e073eb33787faa302b8695dfd6e04aa
Parents: 7585cfc
Author: Thomas Groh <tg...@google.com>
Authored: Fri Aug 5 09:58:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Aug 5 10:04:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunnerTest.java   | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4546fd9c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 29dea32..1e73ec0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -109,6 +109,37 @@ public class DirectRunnerTest implements Serializable {
     result.awaitCompletion();
   }
 
+  @Test
+  public void reusePipelineSucceeds() throws Throwable {
+    Pipeline p = getPipeline();
+
+    PCollection<KV<String, Long>> counts =
+        p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo"))
+            .apply(MapElements.via(new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return input;
+              }
+            }))
+            .apply(Count.<String>perElement());
+    PCollection<String> countStrs =
+        counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
+          @Override
+          public String apply(KV<String, Long> input) {
+            String str = String.format("%s: %s", input.getKey(), input.getValue());
+            return str;
+          }
+        }));
+
+    PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
+
+    DirectPipelineResult result = ((DirectPipelineResult) p.run());
+    result.awaitCompletion();
+
+    DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
+    otherResult.awaitCompletion();
+  }
+
   @Test(timeout = 5000L)
   public void byteArrayCountShouldSucceed() {
     Pipeline p = getPipeline();