You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:35 UTC

[17/39] incubator-beam git commit: [BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar

[BEAM-815] Fix ApexPipelineOptions conversion and add ApexRunnerRegistrar


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f8a80d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f8a80d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f8a80d

Branch: refs/heads/master
Commit: c8f8a80d4c6846fd941fbba08727b7a3ecaca7e1
Parents: c9f1406
Author: Isma�l Mej�a <ie...@gmail.com>
Authored: Mon Oct 24 11:30:46 2016 +0200
Committer: Isma�l Mej�a <ie...@gmail.com>
Committed: Tue Oct 25 22:36:19 2016 +0200

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  6 +-
 .../apache/beam/runners/apex/ApexRunner.java    |  5 +-
 .../beam/runners/apex/ApexRunnerRegistrar.java  | 61 ++++++++++++++++++++
 .../apex/translators/utils/ApexStreamTuple.java |  2 +-
 .../runners/apex/ApexRunnerRegistrarTest.java   | 47 +++++++++++++++
 5 files changed, 116 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 8b62410..de191f5 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -138,11 +138,11 @@
       <scope>test</scope>
     </dependency-->
     <!-- Optional Pipeline Registration -->
-    <!--dependency>
+    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>
-    </dependency-->
+    </dependency>
   </dependencies>
 
   <build>
@@ -183,7 +183,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=org.apache.beam.runners.apex.TestApexRunner"
+                    "--runner=TestApexRunner"
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f3c44bb..8da4ec3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.apex.translators.TranslationContext;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -75,7 +76,9 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   public static ApexRunner fromOptions(PipelineOptions options) {
-    return new ApexRunner((ApexPipelineOptions) options);
+    ApexPipelineOptions apexPipelineOptions =
+            PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+    return new ApexRunner(apexPipelineOptions);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
new file mode 100644
index 0000000..aa6ef45
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link ApexRunner}.
+ *
+ * {@link AutoService} will register Apex's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public final class ApexRunnerRegistrar {
+    private ApexRunnerRegistrar() {}
+
+    /**
+     * Registers the {@link ApexRunner}.
+     */
+    @AutoService(PipelineRunnerRegistrar.class)
+    public static class Runner implements PipelineRunnerRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+            return ImmutableList
+                    .<Class<? extends PipelineRunner<?>>>of(ApexRunner.class, TestApexRunner.class);
+        }
+    }
+
+    /**
+     * Registers the {@link ApexPipelineOptions}.
+     */
+    @AutoService(PipelineOptionsRegistrar.class)
+    public static class Options implements PipelineOptionsRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+            return ImmutableList.<Class<? extends PipelineOptions>>of(ApexPipelineOptions.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index a260a66..7f8b0fa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -134,7 +134,7 @@ public interface ApexStreamTuple<T> {
   /**
    * Coder for {@link ApexStreamTuple}.
    */
-  public static class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+  class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
     private static final long serialVersionUID = 1L;
     final Coder<T> valueCoder;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c8f8a80d/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
new file mode 100644
index 0000000..7af5585
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerRegistrarTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests the proper registration of the Apex runner.
+ */
+public class ApexRunnerRegistrarTest {
+
+    @Test
+    public void testFullName() {
+        String[] args =
+                new String[] {String.format("--runner=%s", ApexRunner.class.getName())};
+        PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+        assertEquals(opts.getRunner(), ApexRunner.class);
+    }
+
+    @Test
+    public void testClassName() {
+        String[] args =
+                new String[] {String.format("--runner=%s", ApexRunner.class.getSimpleName())};
+        PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+        assertEquals(opts.getRunner(), ApexRunner.class);
+    }
+
+}