You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/07 21:47:15 UTC
[beam] branch master updated: [BEAM-6363] Upgrade Gearpump to 0.9.0
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8da36cf [BEAM-6363] Upgrade Gearpump to 0.9.0
8da36cf is described below
commit 8da36cfd11b05d085e4a7e6e77c3de6e1f1df54e
Author: manuzhang <ow...@gmail.com>
AuthorDate: Tue Dec 4 07:59:53 2018 +0800
[BEAM-6363] Upgrade Gearpump to 0.9.0
---
runners/gearpump/build.gradle | 8 +++---
.../runners/gearpump/GearpumpPipelineOptions.java | 12 ++++----
.../runners/gearpump/GearpumpPipelineResult.java | 20 ++++++++------
.../beam/runners/gearpump/GearpumpRunner.java | 32 +++++++++++-----------
.../beam/runners/gearpump/TestGearpumpRunner.java | 19 ++++---------
.../CreateGearpumpPCollectionViewTranslator.java | 2 +-
.../translators/FlattenPCollectionsTranslator.java | 4 +--
.../translators/GearpumpPipelineTranslator.java | 2 +-
.../gearpump/translators/GroupByKeyTranslator.java | 18 ++++++------
.../translators/ParDoMultiOutputTranslator.java | 4 +--
.../translators/ReadBoundedTranslator.java | 4 +--
.../translators/ReadUnboundedTranslator.java | 4 +--
.../gearpump/translators/TranslationContext.java | 8 +++---
.../translators/WindowAssignTranslator.java | 4 +--
.../translators/functions/DoFnFunction.java | 11 ++++++--
.../gearpump/translators/io/GearpumpSource.java | 10 +++----
.../translators/utils/TranslatorUtils.java | 15 ++++++----
.../beam/runners/gearpump/PipelineOptionsTest.java | 7 ++---
...reateGearpumpPCollectionViewTranslatorTest.java | 2 +-
.../FlattenPCollectionsTranslatorTest.java | 6 ++--
.../translators/GroupByKeyTranslatorTest.java | 4 +--
.../translators/ReadBoundedTranslatorTest.java | 4 +--
.../translators/ReadUnboundedTranslatorTest.java | 4 +--
.../translators/io/GearpumpSourceTest.java | 6 ++--
.../gearpump/translators/io/ValueSoureTest.java | 10 ++-----
.../translators/utils/TranslatorUtilsTest.java | 2 +-
26 files changed, 110 insertions(+), 112 deletions(-)
diff --git a/runners/gearpump/build.gradle b/runners/gearpump/build.gradle
index 3a0ceb1..bd1efa9 100644
--- a/runners/gearpump/build.gradle
+++ b/runners/gearpump/build.gradle
@@ -30,7 +30,7 @@ description = "Apache Beam :: Runners :: Gearpump"
*/
evaluationDependsOn(":beam-sdks-java-core")
-def gearpump_version = "0.8.4"
+def gearpump_version = "0.9.0"
configurations {
validatesRunner
@@ -39,12 +39,12 @@ configurations {
dependencies {
compile library.java.guava
compileOnly "com.typesafe:config:1.3.0"
- compileOnly "org.scala-lang:scala-library:2.11.8"
+ compileOnly "org.scala-lang:scala-library:2.12.7"
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-runners-core-java", configuration: "shadow")
shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")
- shadow "org.apache.gearpump:gearpump-core_2.11:$gearpump_version"
- shadow "org.apache.gearpump:gearpump-streaming_2.11:$gearpump_version"
+ shadow "io.github.gearpump:gearpump-core_2.12:$gearpump_version:assembly"
+ shadow "io.github.gearpump:gearpump-streaming_2.12:$gearpump_version:assembly"
shadow library.java.joda_time
shadow library.java.jackson_annotations
shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index aaf3f1e..eadfc44 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -18,12 +18,12 @@
package org.apache.beam.runners.gearpump;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import io.gearpump.cluster.client.ClientContext;
import java.util.Map;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
/** Options that configure the Gearpump pipeline. */
public interface GearpumpPipelineOptions extends PipelineOptions {
@@ -45,11 +45,11 @@ public interface GearpumpPipelineOptions extends PipelineOptions {
@JsonIgnore
Map<String, String> getSerializers();
- @Description("set EmbeddedCluster for tests")
- void setEmbeddedCluster(EmbeddedCluster cluster);
+ @Description("Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
+ void setRemote(Boolean remote);
- @JsonIgnore
- EmbeddedCluster getEmbeddedCluster();
+ @Default.Boolean(true)
+ Boolean getRemote();
void setClientContext(ClientContext clientContext);
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 92d0b94..7f9b0be 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,15 +17,15 @@
*/
package org.apache.beam.runners.gearpump;
+import io.gearpump.cluster.ApplicationStatus;
+import io.gearpump.cluster.MasterToAppMaster.AppMasterData;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.cluster.client.RunningApplication;
import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.gearpump.cluster.ApplicationStatus;
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.client.RunningApplication;
import org.joda.time.Duration;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@@ -82,6 +82,10 @@ public class GearpumpPipelineResult implements PipelineResult {
String.format("%s does not support querying metrics", getClass().getSimpleName()));
}
+ public ClientContext getClientContext() {
+ return client;
+ }
+
private State getGearpumpState() {
ApplicationStatus status = null;
List<AppMasterData> apps =
@@ -92,11 +96,11 @@ public class GearpumpPipelineResult implements PipelineResult {
status = appData.status();
}
}
- if (null == status || status instanceof ApplicationStatus.NONEXIST$) {
+ if (null == status || status.status().equals("nonexist")) {
return State.UNKNOWN;
- } else if (status instanceof ApplicationStatus.ACTIVE$) {
+ } else if (status.status().equals("active")) {
return State.RUNNING;
- } else if (status instanceof ApplicationStatus.SUCCEEDED$) {
+ } else if (status.status().equals("succeeded")) {
return State.DONE;
} else {
return State.FAILED;
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index fb24101..3181b9c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -19,6 +19,15 @@ package org.apache.beam.runners.gearpump;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.cluster.client.RemoteRuntimeEnvironment;
+import io.gearpump.cluster.client.RunningApplication;
+import io.gearpump.cluster.client.RuntimeEnvironment;
+import io.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.util.Constants;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.gearpump.translators.GearpumpPipelineTranslator;
@@ -27,12 +36,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.client.RunningApplication;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
/**
* A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
@@ -63,7 +66,13 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
appName = DEFAULT_APPNAME;
}
Config config = registerSerializers(ClusterConfig.defaultConfig(), options.getSerializers());
- ClientContext clientContext = getClientContext(options, config);
+ if (options.getRemote()) {
+ RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment());
+ } else {
+ RuntimeEnvironment.setRuntimeEnv(new EmbeddedRuntimeEnvironment());
+ config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
+ }
+ ClientContext clientContext = ClientContext.apply(config);
options.setClientContext(clientContext);
UserConfig userConfig = UserConfig.empty();
JavaStreamApp streamApp = new JavaStreamApp(appName, clientContext, userConfig);
@@ -75,15 +84,6 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
return new GearpumpPipelineResult(clientContext, app);
}
- private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
- EmbeddedCluster cluster = options.getEmbeddedCluster();
- if (cluster != null) {
- return cluster.newClientContext();
- } else {
- return ClientContext.apply(config);
- }
- }
-
/** register class with default kryo serializers. */
private Config registerSerializers(Config config, Map<String, String> userSerializers) {
Map<String, String> serializers = new HashMap<>();
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index a2ddc98..d22be2f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -17,29 +17,19 @@
*/
package org.apache.beam.runners.gearpump;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.client.ClientContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.util.Constants;
-/** Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}. */
+/** Gearpump {@link PipelineRunner} for tests, which uses EmbeddedCluster. */
public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
private final GearpumpRunner delegate;
- private final EmbeddedCluster cluster;
private TestGearpumpRunner(GearpumpPipelineOptions options) {
- Config config = ClusterConfig.master(null);
- config =
- config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
- cluster = new EmbeddedCluster(config);
- cluster.start();
- options.setEmbeddedCluster(cluster);
+ options.setRemote(false);
delegate = GearpumpRunner.fromOptions(options);
}
@@ -53,7 +43,8 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
public GearpumpPipelineResult run(Pipeline pipeline) {
GearpumpPipelineResult result = delegate.run(pipeline);
result.waitUntilFinish();
- cluster.stop();
+ ClientContext client = result.getClientContext();
+ client.close();
return result;
}
}
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index 64d1752..42bce72 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -17,10 +17,10 @@
*/
package org.apache.beam.runners.gearpump.translators;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
import java.util.List;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
/** CreateGearpumpPCollectionView bridges input stream to down stream transforms. */
public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT>
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index a5ef23f..bc81ea1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.gearpump.translators;
import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
@@ -26,8 +28,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
/** Flatten.FlattenPCollectionList is translated to Gearpump merge function. */
public class FlattenPCollectionsTranslator<T>
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
index 7f76223..a7ba477 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.gearpump.translators;
import com.google.common.collect.ImmutableList;
+import io.gearpump.util.Graph;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.util.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 55def4b..52e0158 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -19,6 +19,15 @@ package org.apache.beam.runners.gearpump.translators;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.FoldFunction;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import io.gearpump.streaming.dsl.window.api.Discarding$;
+import io.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
+import io.gearpump.streaming.dsl.window.api.WindowFunction;
+import io.gearpump.streaming.dsl.window.api.Windows;
+import io.gearpump.streaming.dsl.window.impl.Window;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -36,15 +45,6 @@ import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
-import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
-import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
-import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
-import org.apache.gearpump.streaming.dsl.window.api.Windows;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
import org.joda.time.Instant;
/** {@link GroupByKey} is translated to Gearpump groupBy function. */
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
index c076cec..09172e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.gearpump.translators;
+import io.gearpump.streaming.dsl.api.functions.FilterFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -32,8 +34,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
/**
* {@link ParDo.MultiOutput} is translated to Gearpump flatMap function with {@link DoFn} wrapped in
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
index 546f354..f508f05 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.gearpump.translators;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
/**
* {@link Read.Bounded} is translated to Gearpump source function and {@link BoundedSource} is
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
index d082ae3..b508539 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.gearpump.translators;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
/**
* {@link Read.Unbounded} is translated to Gearpump source function and {@link UnboundedSource} is
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b3a962d..840ed2e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,6 +20,10 @@ package org.apache.beam.runners.gearpump.translators;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Iterables;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.streaming.source.DataSource;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.core.construction.TransformInputs;
@@ -29,10 +33,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import org.apache.gearpump.streaming.source.DataSource;
/** Maintains context data for {@link TransformTranslator}s. */
@SuppressWarnings({"rawtypes", "unchecked"})
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 44dbe82..6754d83 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.gearpump.translators;
import com.google.common.collect.Iterables;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -28,8 +30,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.joda.time.Instant;
/** {@link Window.Assign} is translated to Gearpump flatMap function. */
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 194be4b..1df2b15 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.gearpump.translators.functions;
import com.google.common.collect.Iterables;
+import io.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/** Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}. */
@SuppressWarnings("unchecked")
@@ -96,7 +96,10 @@ public class DoFnFunction<InputT, OutputT>
public void setup() {
sideInputReader = new SideInputHandler(sideInputs, InMemoryStateInternals.<Void>forKey(null));
doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
+
+ if (doFnInvoker != null) {
+ doFnInvoker.invokeSetup();
+ }
doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
@@ -106,7 +109,9 @@ public class DoFnFunction<InputT, OutputT>
@Override
public void teardown() {
- doFnInvoker.invokeTeardown();
+ if (doFnInvoker != null) {
+ doFnInvoker.invokeTeardown();
+ }
}
@Override
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 5da447f..aaa6184 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.runners.gearpump.translators.io;
+import io.gearpump.DefaultMessage;
+import io.gearpump.Message;
+import io.gearpump.streaming.source.DataSource;
+import io.gearpump.streaming.source.Watermark;
+import io.gearpump.streaming.task.TaskContext;
import java.io.IOException;
import java.time.Instant;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
@@ -26,11 +31,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.DefaultMessage;
-import org.apache.gearpump.Message;
-import org.apache.gearpump.streaming.source.DataSource;
-import org.apache.gearpump.streaming.source.Watermark;
-import org.apache.gearpump.streaming.task.TaskContext;
/** common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}. */
public abstract class GearpumpSource<T> implements DataSource {
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index f826e4a..08345e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,6 +18,10 @@
package org.apache.beam.runners.gearpump.translators.utils;
import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.FoldFunction;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.window.impl.Window;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
@@ -29,10 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
/** Utility methods for translators. */
public class TranslatorUtils {
@@ -143,7 +143,12 @@ public class TranslatorUtils {
private final String unionTag;
private final Object value;
- /** Constructs a partial union from the given union tag and value. */
+ /**
+ * Constructs a partial union from the given union tag and value.
+ *
+ * @param unionTag tag of union
+ * @param value value of union
+ */
public RawUnionValue(String unionTag, Object value) {
this.unionTag = unionTag;
this.value = value;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
index 801a2ea..7e2c5be 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
@@ -23,13 +23,12 @@ import static org.junit.Assert.assertNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import io.gearpump.cluster.ClusterConfig;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
import org.junit.Test;
/** Tests for {@link GearpumpPipelineOptions}. */
@@ -43,10 +42,9 @@ public class PipelineOptionsTest {
GearpumpPipelineOptions options =
PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Config config = ClusterConfig.master(null);
- EmbeddedCluster cluster = new EmbeddedCluster(config);
options.setSerializers(serializers);
options.setApplicationName(appName);
- options.setEmbeddedCluster(cluster);
+ options.setRemote(false);
options.setParallelism(10);
byte[] serializedOptions = serialize(options);
@@ -55,7 +53,6 @@ public class PipelineOptionsTest {
.readValue(serializedOptions, PipelineOptions.class)
.as(GearpumpPipelineOptions.class);
- assertNull(deserializedOptions.getEmbeddedCluster());
assertNull(deserializedOptions.getSerializers());
assertEquals(10, deserializedOptions.getParallelism());
assertEquals(appName, deserializedOptions.getApplicationName());
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
index 7ba8bb1..b580a24 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -22,9 +22,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.junit.Test;
/** Tests for {@link CreateGearpumpPCollectionViewTranslator}. */
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index 39e51cb..13b8544 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -35,9 +38,6 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 2abcc25..e80d671 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.window.api.WindowFunction;
+import io.gearpump.streaming.dsl.window.impl.Window;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
@@ -35,8 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
-import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
index 3e41bc4..b09d096 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
@@ -23,14 +23,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
index 7a5f7ad..b828f9c 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
@@ -23,14 +23,14 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index d2a4540..619a478 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -18,6 +18,9 @@
package org.apache.beam.runners.gearpump.translators.io;
import com.google.common.collect.Lists;
+import io.gearpump.DefaultMessage;
+import io.gearpump.Message;
+import io.gearpump.streaming.source.Watermark;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
@@ -30,9 +33,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.gearpump.DefaultMessage;
-import org.apache.gearpump.Message;
-import org.apache.gearpump.streaming.source.Watermark;
import org.junit.Assert;
import org.junit.Test;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
index e63ba78..b08d547 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -21,6 +21,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.util.Constants;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -33,9 +35,6 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.util.Constants;
import org.junit.Assert;
import org.junit.Test;
@@ -49,10 +48,8 @@ public class ValueSoureTest {
Config config = ClusterConfig.master(null);
config =
config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
- EmbeddedCluster cluster = new EmbeddedCluster(config);
- cluster.start();
- options.setEmbeddedCluster(cluster);
+ options.setRemote(false);
options.setRunner(GearpumpRunner.class);
options.setParallelism(1);
Pipeline p = Pipeline.create(options);
@@ -61,7 +58,6 @@ public class ValueSoureTest {
p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector()));
p.run().waitUntilFinish();
- cluster.stop();
Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS);
}
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 0f661ef..7932a8c 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -21,13 +21,13 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.window.impl.Window;
import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
import org.junit.Test;
/** Tests for {@link TranslatorUtils}. */