You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/10 02:27:28 UTC
[1/3] incubator-beam git commit: Execute NeedsRunner tests in the
Direct Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master d53e96a0d -> 829121960
Execute NeedsRunner tests in the Direct Runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e797e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e797e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e797e3d
Branch: refs/heads/master
Commit: 9e797e3d3a6bda2ff42b8c3acf8fd60fec179aa4
Parents: ee1297e
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 20 15:18:11 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Jun 9 15:36:48 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 25 ++++++++++++++++++++++++-
1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e797e3d/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index fb2d2e1..def7207 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -86,7 +86,7 @@
<goal>test</goal>
</goals>
<configuration>
- <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
@@ -340,5 +340,28 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <!-- required for XMLSourceTest -->
+ <dependency>
+ <groupId>org.codehaus.woodstox</groupId>
+ <artifactId>stax2-api</artifactId>
+ <version>${stax2.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.woodstox</groupId>
+ <artifactId>woodstox-core-asl</artifactId>
+ <version>${woodstox.version}</version>
+ <exclusions>
+ <!-- javax.xml.stream:stax-api is included in JDK 1.6+ -->
+ <exclusion>
+ <groupId>javax.xml.stream</groupId>
+ <artifactId>stax-api</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
[3/3] incubator-beam git commit: This closes #392
Posted by ke...@apache.org.
This closes #392
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82912196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82912196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82912196
Branch: refs/heads/master
Commit: 82912196080b7ae79b39e355d10077ad8654eda6
Parents: d53e96a 9e797e3
Author: Kenn Knowles <ke...@kennknowles.com>
Authored: Thu Jun 9 19:26:23 2016 -0700
Committer: Kenn Knowles <ke...@kennknowles.com>
Committed: Thu Jun 9 19:26:23 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 25 +++++++++++++++++++-
.../direct/InProcessEvaluationContext.java | 5 ++--
.../direct/WatermarkCallbackExecutor.java | 10 ++++----
3 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Use a DirectExecutor for Watermark
Callbacks
Posted by ke...@apache.org.
Use a DirectExecutor for Watermark Callbacks
This fixes a resource leak where the executor service is not properly
shut down with the rest of the DirectRunner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1297e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1297e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1297e2
Branch: refs/heads/master
Commit: ee1297e21a481fbea52475c0732526a0441d03cb
Parents: d53e96a
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 7 17:50:38 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Jun 9 15:36:48 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/InProcessEvaluationContext.java | 5 +++--
.../beam/runners/direct/WatermarkCallbackExecutor.java | 10 +++++-----
2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 981a842..db8baa0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -46,13 +46,13 @@ import org.apache.beam.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -130,7 +130,8 @@ class InProcessEvaluationContext {
this.applicationStateInternals = new ConcurrentHashMap<>();
this.mergedCounters = new CounterSet();
- this.callbackExecutor = WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
+ this.callbackExecutor =
+ WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 1c9b050..0f73b1d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
/**
* Executes callbacks that occur based on the progression of the watermark per-step.
@@ -51,15 +51,15 @@ class WatermarkCallbackExecutor {
/**
* Create a new {@link WatermarkCallbackExecutor}.
*/
- public static WatermarkCallbackExecutor create(ExecutorService executor) {
+ public static WatermarkCallbackExecutor create(Executor executor) {
return new WatermarkCallbackExecutor(executor);
}
private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
callbacks;
- private final ExecutorService executor;
+ private final Executor executor;
- private WatermarkCallbackExecutor(ExecutorService executor) {
+ private WatermarkCallbackExecutor(Executor executor) {
this.callbacks = new ConcurrentHashMap<>();
this.executor = executor;
}
@@ -101,7 +101,7 @@ class WatermarkCallbackExecutor {
}
synchronized (callbackQueue) {
while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
- executor.submit(callbackQueue.poll().getCallback());
+ executor.execute(callbackQueue.poll().getCallback());
}
}
}