You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/11/16 21:22:59 UTC

[1/4] incubator-beam git commit: This closes #1319

Repository: incubator-beam
Updated Branches:
  refs/heads/master c695ef48b -> 15e93c58e


This closes #1319


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e93c58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e93c58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e93c58

Branch: refs/heads/master
Commit: 15e93c58e933ef913c89a9654ceb567299a2fe5b
Parents: c695ef4 bf4c504
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 13:22:41 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       |  3 +-
 .../beam/runners/direct/TransformExecutor.java  | 23 -----------
 .../runners/direct/TransformExecutorTest.java   | 43 --------------------
 .../apache/beam/sdk/util/TimerInternals.java    | 13 ++++--
 4 files changed, 11 insertions(+), 71 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Reduce Visibility of PCollectionViewWriter

Posted by tg...@apache.org.
Reduce Visibility of PCollectionViewWriter

This is an internal implementation detail of the DirectRunner and should
not be exposed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf4c504d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf4c504d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf4c504d

Branch: refs/heads/master
Commit: bf4c504dffa05c8eccc5c2ae16dffdd53418a468
Parents: 5ba4d18
Author: Thomas Groh <tg...@google.com>
Authored: Wed Nov 16 10:37:42 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf4c504d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index c9a7864..04c8eb6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -183,10 +183,11 @@ public class DirectRunner
   /**
    * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
    * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+   *
    * @param <ElemT> the type of elements the input {@link PCollection} contains.
    * @param <ViewT> the type of the PCollectionView this writer writes to.
    */
-  public interface PCollectionViewWriter<ElemT, ViewT> {
+  interface PCollectionViewWriter<ElemT, ViewT> {
     void add(Iterable<WindowedValue<ElemT>> values);
   }
 


[2/4] incubator-beam git commit: Remove unused Thread variable in TransformExecutor

Posted by tg...@apache.org.
Remove unused Thread variable in TransformExecutor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ba4d181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ba4d181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ba4d181

Branch: refs/heads/master
Commit: 5ba4d181d4625d43078bb0b071635d563d925277
Parents: 3e6a4f4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 8 14:16:23 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  | 23 -----------
 .../runners/direct/TransformExecutorTest.java   | 43 --------------------
 2 files changed, 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index c4002b5..1704955 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,13 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -70,8 +66,6 @@ class TransformExecutor<T> implements Runnable {
   private final TransformExecutorService transformEvaluationState;
   private final EvaluationContext context;
 
-  private final AtomicReference<Thread> thread;
-
   private TransformExecutor(
       EvaluationContext context,
       TransformEvaluatorFactory factory,
@@ -90,20 +84,12 @@ class TransformExecutor<T> implements Runnable {
 
     this.transformEvaluationState = transformEvaluationState;
     this.context = context;
-    this.thread = new AtomicReference<>();
   }
 
   @Override
   public void run() {
     MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
     MetricsEnvironment.setMetricsContainer(metricsContainer);
-    checkState(
-        thread.compareAndSet(null, Thread.currentThread()),
-        "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
-        TransformExecutor.class.getSimpleName(),
-        transform.getFullName(),
-        Thread.currentThread(),
-        thread.get());
     try {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -186,13 +172,4 @@ class TransformExecutor<T> implements Runnable {
     }
     return result;
   }
-
-  /**
-   * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
-   * Otherwise, return null.
-   */
-  @Nullable
-  public Thread getThread() {
-    return thread.get();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 32f874d..0b7b882 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -277,48 +276,6 @@ public class TransformExecutorTest {
   }
 
   @Test
-  public void duringCallGetThreadIsNonNull() throws Exception {
-    final TransformResult result =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws Exception {
-            throw new IllegalArgumentException("Shouldn't be called");
-          }
-
-          @Override
-          public TransformResult finishBundle() throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-            return result;
-          }
-        };
-
-    when(registry.forApplication(created.getProducingTransformInternal(), null))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            evaluationContext,
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            null,
-            created.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    assertThat(executor.getThread(), not(nullValue()));
-
-    // Finish the execution so everything can get closed down cleanly.
-    evaluatorLatch.countDown();
-  }
-
-  @Test
   public void callWithEnforcementAppliesEnforcement() throws Exception {
     final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();


[4/4] incubator-beam git commit: Reduce incidence of Namespace StringKey comparisons

Posted by tg...@apache.org.
Reduce incidence of Namespace StringKey comparisons

If the Namespace of a TimerData reports itself as being equal to the
other namespace, immediately return 0 rather than generating the string
keys and comparing them.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e6a4f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e6a4f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e6a4f4a

Branch: refs/heads/master
Commit: 3e6a4f4a49344871430d2711e934b0493c17499f
Parents: c695ef4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 8 14:18:58 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/TimerInternals.java  | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e6a4f4a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 743f3f7..5d4a72d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -230,12 +230,17 @@ public interface TimerInternals {
      * arbitrary.
      */
     @Override
-    public int compareTo(TimerData o) {
+    public int compareTo(TimerData that) {
+      if (this.equals(that)) {
+        return 0;
+      }
       ComparisonChain chain =
-          ComparisonChain.start().compare(timestamp, o.getTimestamp()).compare(domain, o.domain);
-      if (chain.result() == 0) {
+          ComparisonChain.start()
+              .compare(this.timestamp, that.getTimestamp())
+              .compare(this.domain, that.domain);
+      if (chain.result() == 0 && !this.namespace.equals(that.namespace)) {
         // Obtaining the stringKey may be expensive; only do so if required
-        chain = chain.compare(namespace.stringKey(), o.namespace.stringKey());
+        chain = chain.compare(namespace.stringKey(), that.namespace.stringKey());
       }
       return chain.result();
     }