You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:44:26 UTC
[48/50] [abbrv] incubator-beam git commit: remove "pipeline" in
runner name
remove "pipeline" in runner name
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/94bd47cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/94bd47cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/94bd47cd
Branch: refs/heads/gearpump-runner
Commit: 94bd47cdb7e4b8f1d874ace1c60e4251636a8110
Parents: 8f013cb
Author: manuzhang <ow...@gmail.com>
Authored: Wed Oct 26 16:18:39 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Oct 26 16:19:13 2016 +0800
----------------------------------------------------------------------
.../gearpump/GearpumpPipelineRunner.java | 191 -------------------
.../GearpumpPipelineRunnerRegistrar.java | 62 ------
.../beam/runners/gearpump/GearpumpRunner.java | 191 +++++++++++++++++++
.../gearpump/GearpumpRunnerRegistrar.java | 62 ++++++
.../runners/gearpump/TestGearpumpRunner.java | 4 +-
.../gearpump/examples/StreamingWordCount.java | 4 +-
6 files changed, 257 insertions(+), 257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
deleted file mode 100644
index 9e32227..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.gearpump;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.core.AssignWindows;
-import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to Gearpump Stream DSL
- * and then executing them on a Gearpump cluster.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResult> {
-
- private final GearpumpPipelineOptions options;
-
- private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
- private static final String DEFAULT_APPNAME = "beam_gearpump_app";
-
- public GearpumpPipelineRunner(GearpumpPipelineOptions options) {
- this.options = options;
- }
-
- public static GearpumpPipelineRunner fromOptions(PipelineOptions options) {
- GearpumpPipelineOptions pipelineOptions =
- PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
- return new GearpumpPipelineRunner(pipelineOptions);
- }
-
-
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (Window.Bound.class.equals(transform.getClass())) {
- return (OutputT) super.apply(
- new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
- } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
- && ((PCollectionList<?>) input).size() == 0) {
- return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
- } else if (Create.Values.class.equals(transform.getClass())) {
- return (OutputT) PCollection
- .<OutputT>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED);
- } else {
- return super.apply(transform, input);
- }
- }
-
- @Override
- public GearpumpPipelineResult run(Pipeline pipeline) {
- String appName = options.getApplicationName();
- if (null == appName) {
- appName = DEFAULT_APPNAME;
- }
- Config config = registerSerializers(ClusterConfig.defaultConfig(),
- options.getSerializers());
- ClientContext clientContext = getClientContext(options, config);
- options.setClientContext(clientContext);
- JavaStreamApp streamApp = new JavaStreamApp(
- appName, clientContext, UserConfig.empty());
- TranslationContext translationContext = new TranslationContext(streamApp, options);
- GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
- translator.translate(pipeline);
- streamApp.run();
-
- return null;
- }
-
- private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
- EmbeddedCluster cluster = options.getEmbeddedCluster();
- if (cluster != null) {
- return cluster.newClientContext();
- } else {
- return ClientContext.apply(config);
- }
- }
-
- /**
- * register class with default kryo serializers.
- */
- private Config registerSerializers(Config config, Map<String, String> userSerializers) {
- Map<String, String> serializers = new HashMap<>();
- serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
- serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
- serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
- serializers.put("org.joda.time.Instant", "");
- serializers.put("org.apache.beam.sdk.values.KV", "");
- serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
- serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
- if (userSerializers != null && !userSerializers.isEmpty()) {
- serializers.putAll(userSerializers);
- }
- return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
- }
-
-
- /**
- * copied from DirectPipelineRunner.
- * used to replace Window.Bound till window function is added to Gearpump Stream DSL
- */
- private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
- extends PTransform<PCollection<T>, PCollection<T>> {
-
- private final Window.Bound<T> wrapped;
-
- AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
- WindowFn<T, BoundedWindow> windowFn =
- (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
- if (!windowFn.isNonMerging()) {
- throw new UnsupportedOperationException(
- "merging window is not supported in Gearpump pipeline");
- }
-
- // If the Window.Bound transform only changed parts other than the WindowFn, then
- // we skip AssignWindows even though it should be harmless in a perfect world.
- // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
- // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
- // AssignWindows in this case.
- if (wrapped.getWindowFn() == null) {
- return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
- .setWindowingStrategyInternal(outputStrategy);
- } else {
- return input
- .apply("AssignWindows", new AssignWindows<>(windowFn))
- .setWindowingStrategyInternal(outputStrategy);
- }
- }
- }
-
- private static class IdentityFn<T> extends OldDoFn<T, T> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
deleted file mode 100644
index ca173d1..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link GearpumpPipelineRunner}.
- *
- * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
- * and {@link PipelineOptions} as available pipeline runner services.
- */
-public class GearpumpPipelineRunnerRegistrar {
- private GearpumpPipelineRunnerRegistrar() { }
-
- /**
- * Registers the {@link GearpumpPipelineRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
-
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
- }
- }
-
- /**
- * Registers the {@link GearpumpPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
-
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
new file mode 100644
index 0000000..ed0813d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.core.AssignWindows;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to Gearpump Stream DSL
+ * and then executing them on a Gearpump cluster.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+ private final GearpumpPipelineOptions options;
+
+ private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
+ private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+
+ public GearpumpRunner(GearpumpPipelineOptions options) {
+ this.options = options;
+ }
+
+ public static GearpumpRunner fromOptions(PipelineOptions options) {
+ GearpumpPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+ return new GearpumpRunner(pipelineOptions);
+ }
+
+
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ if (Window.Bound.class.equals(transform.getClass())) {
+ return (OutputT) super.apply(
+ new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+ } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+ && ((PCollectionList<?>) input).size() == 0) {
+ return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
+ } else if (Create.Values.class.equals(transform.getClass())) {
+ return (OutputT) PCollection
+ .<OutputT>createPrimitiveOutputInternal(
+ input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+ @Override
+ public GearpumpPipelineResult run(Pipeline pipeline) {
+ String appName = options.getApplicationName();
+ if (null == appName) {
+ appName = DEFAULT_APPNAME;
+ }
+ Config config = registerSerializers(ClusterConfig.defaultConfig(),
+ options.getSerializers());
+ ClientContext clientContext = getClientContext(options, config);
+ options.setClientContext(clientContext);
+ JavaStreamApp streamApp = new JavaStreamApp(
+ appName, clientContext, UserConfig.empty());
+ TranslationContext translationContext = new TranslationContext(streamApp, options);
+ GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
+ translator.translate(pipeline);
+ streamApp.run();
+
+ return null;
+ }
+
+ private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
+ EmbeddedCluster cluster = options.getEmbeddedCluster();
+ if (cluster != null) {
+ return cluster.newClientContext();
+ } else {
+ return ClientContext.apply(config);
+ }
+ }
+
+ /**
+ * register class with default kryo serializers.
+ */
+ private Config registerSerializers(Config config, Map<String, String> userSerializers) {
+ Map<String, String> serializers = new HashMap<>();
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
+ serializers.put("org.joda.time.Instant", "");
+ serializers.put("org.apache.beam.sdk.values.KV", "");
+ serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
+ serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+ if (userSerializers != null && !userSerializers.isEmpty()) {
+ serializers.putAll(userSerializers);
+ }
+ return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
+ }
+
+
+ /**
+ * copied from DirectPipelineRunner.
+ * used to replace Window.Bound till window function is added to Gearpump Stream DSL
+ */
+ private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private final Window.Bound<T> wrapped;
+
+ AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+ WindowFn<T, BoundedWindow> windowFn =
+ (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+ if (!windowFn.isNonMerging()) {
+ throw new UnsupportedOperationException(
+ "merging window is not supported in Gearpump pipeline");
+ }
+
+ // If the Window.Bound transform only changed parts other than the WindowFn, then
+ // we skip AssignWindows even though it should be harmless in a perfect world.
+ // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+ // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
+ // AssignWindows in this case.
+ if (wrapped.getWindowFn() == null) {
+ return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+ .setWindowingStrategyInternal(outputStrategy);
+ } else {
+ return input
+ .apply("AssignWindows", new AssignWindows<>(windowFn))
+ .setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+ }
+
+ private static class IdentityFn<T> extends OldDoFn<T, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
new file mode 100644
index 0000000..b77e1e3
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link GearpumpRunner}.
+ *
+ * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public class GearpumpRunnerRegistrar {
+ private GearpumpRunnerRegistrar() { }
+
+ /**
+ * Registers the {@link GearpumpRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link GearpumpPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index cedd31f..89d31a6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -33,14 +33,14 @@ import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
*/
public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
- private final GearpumpPipelineRunner delegate;
+ private final GearpumpRunner delegate;
private final EmbeddedCluster cluster;
private TestGearpumpRunner(GearpumpPipelineOptions options) {
cluster = EmbeddedCluster.apply();
cluster.start();
options.setEmbeddedCluster(cluster);
- delegate = GearpumpPipelineRunner.fromOptions(options);
+ delegate = GearpumpRunner.fromOptions(options);
}
public static TestGearpumpRunner fromOptions(PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index ba50de7..1d85c25 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.gearpump.examples;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
+import org.apache.beam.runners.gearpump.GearpumpRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -75,7 +75,7 @@ public class StreamingWordCount {
public static void main(String[] args) {
GearpumpPipelineOptions options = PipelineOptionsFactory
.fromArgs(args).as(GearpumpPipelineOptions.class);
- options.setRunner(GearpumpPipelineRunner.class);
+ options.setRunner(GearpumpRunner.class);
options.setApplicationName("StreamingWordCount");
options.setParallelism(1);