You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/03 04:26:46 UTC
[1/2] incubator-beam git commit: BEAM-858 Enable ApexRunner
integration test in examples.
Repository: incubator-beam
Updated Branches:
refs/heads/apex-runner 968eb32b8 -> 51af7e592
BEAM-858 Enable ApexRunner integration test in examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77f4ba2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77f4ba2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77f4ba2e
Branch: refs/heads/apex-runner
Commit: 77f4ba2eebf0c02dc95a8b90b4277a12638c4300
Parents: 968eb32
Author: Thomas Weise <th...@apache.org>
Authored: Fri Oct 28 23:47:42 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 1 03:35:15 2016 +0100
----------------------------------------------------------------------
examples/java/pom.xml | 31 ++++++++++++++++++++
runners/apex/pom.xml | 2 +-
.../beam/runners/apex/ApexPipelineOptions.java | 6 ----
.../runners/apex/ApexPipelineTranslator.java | 2 +-
.../apache/beam/runners/apex/ApexRunner.java | 12 --------
.../beam/runners/apex/ApexRunnerResult.java | 27 +++++++++++++++--
.../beam/runners/apex/TestApexRunner.java | 20 +++++++++++--
.../io/ApexReadUnboundedInputOperator.java | 13 ++++++--
.../apex/examples/StreamingWordCountTest.java | 4 +--
.../translators/ParDoBoundTranslatorTest.java | 6 ++--
10 files changed, 91 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index fc82ed4..6c1a16f 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -85,6 +85,14 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-apex</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
@@ -224,6 +232,29 @@
</systemPropertyVariables>
</configuration>
</execution>
+ <execution>
+ <id>apex-runner-integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>WordCountIT.java</include>
+ </includes>
+ <parallel>all</parallel>
+ <threadCount>4</threadCount>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--project=apache-beam-testing",
+ "--tempRoot=gs://temp-storage-for-end-to-end-tests",
+ "--runner=org.apache.beam.runners.apex.TestApexRunner"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 6ccc0da..d4bcc3d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -72,7 +72,7 @@
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.core.version}</version>
- <scope>test</scope>
+ <scope>runtime</scope>
</dependency>
<!--- Beam -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 141a8c1..54fdf76 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -32,12 +32,6 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
String getApplicationName();
- @Description("set parallelism for Apex runner")
- void setParallelism(int parallelism);
-
- @Default.Integer(1)
- int getParallelism();
-
@Description("execute the pipeline with embedded cluster")
void setEmbeddedExecution(boolean embedded);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a6857ee..8a87ce0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -150,7 +150,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
transform.getSource());
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
- unboundedSource, context.getPipelineOptions());
+ unboundedSource, true, context.getPipelineOptions());
context.addOperator(operator, operator.output);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 8da4ec3..416e99c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -137,18 +137,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
assertionError = null;
lc.runAsync();
- if (options.getRunMillis() > 0) {
- try {
- long timeout = System.currentTimeMillis() + options.getRunMillis();
- while (System.currentTimeMillis() < timeout) {
- if (assertionError != null) {
- throw assertionError;
- }
- }
- } finally {
- lc.shutdown();
- }
- }
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 4e3a8d2..03428a6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -21,6 +21,7 @@ import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import java.io.IOException;
+import java.lang.reflect.Field;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
@@ -63,12 +64,12 @@ public class ApexRunnerResult implements PipelineResult {
@Override
public State waitUntilFinish(Duration duration) {
- throw new UnsupportedOperationException();
+ return ApexRunnerResult.waitUntilFinished(ctrl, duration);
}
@Override
public State waitUntilFinish() {
- throw new UnsupportedOperationException();
+ return ApexRunnerResult.waitUntilFinished(ctrl, null);
}
@Override
@@ -84,4 +85,26 @@ public class ApexRunnerResult implements PipelineResult {
return apexDAG;
}
+ public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
+ // we need to rely on internal field for now
+ // Apex should make it available through API in upcoming release.
+ long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + duration.getMillis();
+ Field appDoneField;
+ try {
+ appDoneField = ctrl.getClass().getDeclaredField("appDone");
+ appDoneField.setAccessible(true);
+ while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
+ if (ApexRunner.assertionError != null) {
+ throw ApexRunner.assertionError;
+ }
+ Thread.sleep(500);
+ }
+ return appDoneField.getBoolean(ctrl) ? State.DONE : null;
+ } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
+ | IllegalAccessException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index 2e048f0..e447e37 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.apex;
+import java.io.IOException;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -24,18 +26,19 @@ import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.joda.time.Duration;
/**
* Apex {@link PipelineRunner} for testing.
*/
public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
- private ApexRunner delegate;
+ private static final int RUN_WAIT_MILLIS = 20000;
+ private final ApexRunner delegate;
private TestApexRunner(ApexPipelineOptions options) {
options.setEmbeddedExecution(true);
//options.setEmbeddedExecutionDebugMode(false);
- options.setRunMillis(20000);
this.delegate = ApexRunner.fromOptions(options);
}
@@ -53,7 +56,18 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
@Override
public ApexRunnerResult run(Pipeline pipeline) {
- return delegate.run(pipeline);
+ ApexRunnerResult result = delegate.run(pipeline);
+ try {
+ // this is necessary for tests that just call run() and not waitUntilFinish
+ result.waitUntilFinish(Duration.millis(RUN_WAIT_MILLIS));
+ return result;
+ } finally {
+ try {
+ result.cancel();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 3188dfa..0e2b0c2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -55,6 +55,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
private final SerializablePipelineOptions pipelineOptions;
@Bind(JavaSerializer.class)
private final UnboundedSource<OutputT, CheckpointMarkT> source;
+ private final boolean isBoundedSource;
private transient UnboundedSource.UnboundedReader<OutputT> reader;
private transient boolean available = false;
@OutputPortFieldAnnotation(optional = true)
@@ -65,16 +66,24 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
ApexPipelineOptions options) {
this.pipelineOptions = new SerializablePipelineOptions(options);
this.source = source;
+ this.isBoundedSource = false;
+ }
+
+ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+ boolean isBoundedSource, ApexPipelineOptions options) {
+ this.pipelineOptions = new SerializablePipelineOptions(options);
+ this.source = source;
+ this.isBoundedSource = isBoundedSource;
}
@SuppressWarnings("unused") // for Kryo
private ApexReadUnboundedInputOperator() {
- this.pipelineOptions = null; this.source = null;
+ this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
}
@Override
public void beginWindow(long windowId) {
- if (!available && source instanceof ValuesSource) {
+ if (!available && (isBoundedSource || source instanceof ValuesSource)) {
// if it's a Create and the input was consumed, emit final watermark
emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
// terminate the stream (allows tests to finish faster)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
index 6ab2e8e..363e669 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
@@ -91,7 +91,6 @@ public class StreamingWordCountTest {
ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ApexPipelineOptions.class);
options.setApplicationName("StreamingWordCount");
- options.setParallelism(1);
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> wordCounts =
@@ -110,7 +109,7 @@ public class StreamingWordCountTest {
&& FormatAsStringFn.RESULTS.containsKey("bar")) {
break;
}
- Thread.sleep(1000);
+ result.waitUntilFinish(Duration.millis(1000));
}
result.cancel();
Assert.assertTrue(
@@ -118,4 +117,5 @@ public class StreamingWordCountTest {
FormatAsStringFn.RESULTS.clear();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 6f50398..2379a9e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -192,7 +192,7 @@ public class ParDoBoundTranslatorTest {
Pipeline pipeline = Pipeline.create(options);
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
- // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown
+ // TODO: terminate faster based on processed assertion vs. auto-shutdown
pipeline.run();
}
@@ -263,8 +263,8 @@ public class ParDoBoundTranslatorTest {
Pipeline pipeline = Pipeline.create(options);
List<Integer> inputs = Arrays.asList(3, -42, 666);
- final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
- final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
+ final TupleTag<String> mainOutputTag = new TupleTag<>("main");
+ final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");
PCollectionView<Integer> sideInput1 = pipeline
.apply("CreateSideInput1", Create.of(11))
[2/2] incubator-beam git commit: Closes #1227
Posted by th...@apache.org.
Closes #1227
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51af7e59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51af7e59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51af7e59
Branch: refs/heads/apex-runner
Commit: 51af7e592327ef711673a2b9828536051a6c3898
Parents: 968eb32 77f4ba2
Author: Thomas Weise <th...@apache.org>
Authored: Wed Nov 2 21:16:24 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Wed Nov 2 21:16:24 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 31 ++++++++++++++++++++
runners/apex/pom.xml | 2 +-
.../beam/runners/apex/ApexPipelineOptions.java | 6 ----
.../runners/apex/ApexPipelineTranslator.java | 2 +-
.../apache/beam/runners/apex/ApexRunner.java | 12 --------
.../beam/runners/apex/ApexRunnerResult.java | 27 +++++++++++++++--
.../beam/runners/apex/TestApexRunner.java | 20 +++++++++++--
.../io/ApexReadUnboundedInputOperator.java | 13 ++++++--
.../apex/examples/StreamingWordCountTest.java | 4 +--
.../translators/ParDoBoundTranslatorTest.java | 6 ++--
10 files changed, 91 insertions(+), 32 deletions(-)
----------------------------------------------------------------------