You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:35 UTC
[17/39] incubator-beam git commit: [BEAM-815] Fix ApexPipelineOptions
conversion and add ApexRunnerRegistrar
[BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f8a80d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f8a80d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f8a80d
Branch: refs/heads/master
Commit: c8f8a80d4c6846fd941fbba08727b7a3ecaca7e1
Parents: c9f1406
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Mon Oct 24 11:30:46 2016 +0200
Committer: Isma�l Mej�a <ie...@gmail.com>
Committed: Tue Oct 25 22:36:19 2016 +0200
----------------------------------------------------------------------
runners/apex/pom.xml | 6 +-
.../apache/beam/runners/apex/ApexRunner.java | 5 +-
.../beam/runners/apex/ApexRunnerRegistrar.java | 61 ++++++++++++++++++++
.../apex/translators/utils/ApexStreamTuple.java | 2 +-
.../runners/apex/ApexRunnerRegistrarTest.java | 47 +++++++++++++++
5 files changed, 116 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8b62410..de191f5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -138,11 +138,11 @@
<scope>test</scope>
</dependency-->
<!-- Optional Pipeline Registration -->
- <!--dependency>
+ <dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<optional>true</optional>
- </dependency-->
+ </dependency>
</dependencies>
<build>
@@ -183,7 +183,7 @@
<systemPropertyVariables>
<beamTestPipelineOptions>
[
- "--runner=org.apache.beam.runners.apex.TestApexRunner"
+ "--runner=TestApexRunner"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f3c44bb..8da4ec3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.apex.translators.TranslationContext;
import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
@@ -75,7 +76,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
public static ApexRunner fromOptions(PipelineOptions options) {
- return new ApexRunner((ApexPipelineOptions) options);
+ ApexPipelineOptions apexPipelineOptions =
+ PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+ return new ApexRunner(apexPipelineOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
new file mode 100644
index 0000000..aa6ef45
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link ApexRunner}.
+ *
+ * {@link AutoService} will register Apex's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public final class ApexRunnerRegistrar {
+ private ApexRunnerRegistrar() {}
+
+ /**
+ * Registers the {@link ApexRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList
+ .<Class<? extends PipelineRunner<?>>>of(ApexRunner.class, TestApexRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link ApexPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(ApexPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index a260a66..7f8b0fa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -134,7 +134,7 @@ public interface ApexStreamTuple<T> {
/**
* Coder for {@link ApexStreamTuple}.
*/
- public static class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+ class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
private static final long serialVersionUID = 1L;
final Coder<T> valueCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
new file mode 100644
index 0000000..7af5585
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests the proper registration of the Apex runner.
+ */
+public class ApexRunnerRegistrarTest {
+
+ @Test
+ public void testFullName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", ApexRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), ApexRunner.class);
+ }
+
+ @Test
+ public void testClassName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", ApexRunner.class.getSimpleName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), ApexRunner.class);
+ }
+
+}