You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 23:01:09 UTC
[1/2] incubator-beam git commit: [BEAM-815] Fix ApexPipelineOptions
conversion and add ApexRunnerRegistrar
Repository: incubator-beam
Updated Branches:
refs/heads/apex-runner 989e39987 -> 638117dc8
[BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11a9f348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11a9f348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11a9f348
Branch: refs/heads/apex-runner
Commit: 11a9f348ddc01851775a546ae500154c52a8f048
Parents: c9f1406
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Mon Oct 24 11:30:46 2016 +0200
Committer: Isma�l Mej�a <ie...@gmail.com>
Committed: Tue Oct 25 12:06:00 2016 +0200
----------------------------------------------------------------------
runners/apex/pom.xml | 6 +-
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../beam/runners/apex/ApexRunnerRegistrar.java | 61 ++++++++++++++++++++
.../apex/translators/utils/ApexStreamTuple.java | 2 +-
.../runners/apex/ApexRunnerRegistrarTest.java | 30 ++++++++++
5 files changed, 99 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11a9f348/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8b62410..de191f5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -138,11 +138,11 @@
<scope>test</scope>
</dependency-->
<!-- Optional Pipeline Registration -->
- <!--dependency>
+ <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
- </dependency-->
+ </dependency>
</dependencies>
<build>
@@ -183,7 +183,7 @@
<systemPropertyVariables>
<beamTestPipelineOptions>
[
- "--runner=org.apache.beam.runners.apex.TestApexRunner"
+ "--runner=TestApexRunner"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11a9f348/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f3c44bb..8da4ec3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.apex.translators.TranslationContext;
import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
@@ -75,7 +76,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
public static ApexRunner fromOptions(PipelineOptions options) {
- return new ApexRunner((ApexPipelineOptions) options);
+ ApexPipelineOptions apexPipelineOptions =
+ PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+ return new ApexRunner(apexPipelineOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11a9f348/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
new file mode 100644
index 0000000..aa6ef45
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link ApexRunner}.
+ *
+ * {@link AutoService} will register Apex's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public final class ApexRunnerRegistrar {
+ private ApexRunnerRegistrar() {}
+
+ /**
+ * Registers the {@link ApexRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList
+ .<Class<? extends PipelineRunner<?>>>of(ApexRunner.class, TestApexRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link ApexPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(ApexPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11a9f348/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index a260a66..7f8b0fa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -134,7 +134,7 @@ public interface ApexStreamTuple<T> {
/**
* Coder for {@link ApexStreamTuple}.
*/
- public static class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+ class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
private static final long serialVersionUID = 1L;
final Coder<T> valueCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11a9f348/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
new file mode 100644
index 0000000..6f32da5
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
@@ -0,0 +1,30 @@
+package org.apache.beam.runners.apex;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests the proper registration of the Apex runner.
+ */
+public class ApexRunnerRegistrarTest {
+
+ @Test
+ public void testFullName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", ApexRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), ApexRunner.class);
+ }
+
+ @Test
+ public void testClassName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", ApexRunner.class.getSimpleName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), ApexRunner.class);
+ }
+
+}
[2/2] incubator-beam git commit: Closes #1167
Posted by th...@apache.org.
Closes #1167
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/638117dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/638117dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/638117dc
Branch: refs/heads/apex-runner
Commit: 638117dc871c52ba1408d342901ae095c33e2319
Parents: 989e399 11a9f34
Author: Thomas Weise <th...@apache.org>
Authored: Tue Oct 25 13:48:12 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Oct 25 13:48:12 2016 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 6 +-
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../beam/runners/apex/ApexRunnerRegistrar.java | 61 ++++++++++++++++++++
.../runners/apex/ApexRunnerRegistrarTest.java | 30 ++++++++++
4 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/638117dc/runners/apex/pom.xml
----------------------------------------------------------------------