You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/10 02:27:28 UTC

[1/3] incubator-beam git commit: Execute NeedsRunner tests in the Direct Runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master d53e96a0d -> 829121960


Execute NeedsRunner tests in the Direct Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e797e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e797e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e797e3d

Branch: refs/heads/master
Commit: 9e797e3d3a6bda2ff42b8c3acf8fd60fec179aa4
Parents: ee1297e
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 20 15:18:11 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Jun 9 15:36:48 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml | 25 ++++++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e797e3d/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index fb2d2e1..def7207 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -86,7 +86,7 @@
               <goal>test</goal>
             </goals>
             <configuration>
-              <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>
@@ -340,5 +340,28 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+
+    <!-- required for XMLSourceTest -->
+    <dependency>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>stax2-api</artifactId>
+      <version>${stax2.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.woodstox</groupId>
+      <artifactId>woodstox-core-asl</artifactId>
+      <version>${woodstox.version}</version>
+      <exclusions>
+        <!-- javax.xml.stream:stax-api is included in JDK 1.6+ -->
+        <exclusion>
+          <groupId>javax.xml.stream</groupId>
+          <artifactId>stax-api</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 </project>


[3/3] incubator-beam git commit: This closes #392

Posted by ke...@apache.org.
This closes #392


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82912196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82912196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82912196

Branch: refs/heads/master
Commit: 82912196080b7ae79b39e355d10077ad8654eda6
Parents: d53e96a 9e797e3
Author: Kenn Knowles <ke...@kennknowles.com>
Authored: Thu Jun 9 19:26:23 2016 -0700
Committer: Kenn Knowles <ke...@kennknowles.com>
Committed: Thu Jun 9 19:26:23 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     | 25 +++++++++++++++++++-
 .../direct/InProcessEvaluationContext.java      |  5 ++--
 .../direct/WatermarkCallbackExecutor.java       | 10 ++++----
 3 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Use a DirectExecutor for Watermark Callbacks

Posted by ke...@apache.org.
Use a DirectExecutor for Watermark Callbacks

This fixes a resource leak where the executor service is not properly
shut down with the rest of the DirectRunner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1297e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1297e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1297e2

Branch: refs/heads/master
Commit: ee1297e21a481fbea52475c0732526a0441d03cb
Parents: d53e96a
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 7 17:50:38 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Jun 9 15:36:48 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/InProcessEvaluationContext.java   |  5 +++--
 .../beam/runners/direct/WatermarkCallbackExecutor.java    | 10 +++++-----
 2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 981a842..db8baa0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -46,13 +46,13 @@ import org.apache.beam.sdk.values.PValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -130,7 +130,8 @@ class InProcessEvaluationContext {
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedCounters = new CounterSet();
 
-    this.callbackExecutor = WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
+    this.callbackExecutor =
+        WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 1c9b050..0f73b1d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 
 /**
  * Executes callbacks that occur based on the progression of the watermark per-step.
@@ -51,15 +51,15 @@ class WatermarkCallbackExecutor {
   /**
    * Create a new {@link WatermarkCallbackExecutor}.
    */
-  public static WatermarkCallbackExecutor create(ExecutorService executor) {
+  public static WatermarkCallbackExecutor create(Executor executor) {
     return new WatermarkCallbackExecutor(executor);
   }
 
   private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
       callbacks;
-  private final ExecutorService executor;
+  private final Executor executor;
 
-  private WatermarkCallbackExecutor(ExecutorService executor) {
+  private WatermarkCallbackExecutor(Executor executor) {
     this.callbacks = new ConcurrentHashMap<>();
     this.executor = executor;
   }
@@ -101,7 +101,7 @@ class WatermarkCallbackExecutor {
     }
     synchronized (callbackQueue) {
       while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
-        executor.submit(callbackQueue.poll().getCallback());
+        executor.execute(callbackQueue.poll().getCallback());
       }
     }
   }