You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/07 21:47:15 UTC

[beam] branch master updated: [BEAM-6363] Upgrade Gearpump to 0.9.0

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da36cf  [BEAM-6363] Upgrade Gearpump to 0.9.0
8da36cf is described below

commit 8da36cfd11b05d085e4a7e6e77c3de6e1f1df54e
Author: manuzhang <ow...@gmail.com>
AuthorDate: Tue Dec 4 07:59:53 2018 +0800

    [BEAM-6363] Upgrade Gearpump to 0.9.0
---
 runners/gearpump/build.gradle                      |  8 +++---
 .../runners/gearpump/GearpumpPipelineOptions.java  | 12 ++++----
 .../runners/gearpump/GearpumpPipelineResult.java   | 20 ++++++++------
 .../beam/runners/gearpump/GearpumpRunner.java      | 32 +++++++++++-----------
 .../beam/runners/gearpump/TestGearpumpRunner.java  | 19 ++++---------
 .../CreateGearpumpPCollectionViewTranslator.java   |  2 +-
 .../translators/FlattenPCollectionsTranslator.java |  4 +--
 .../translators/GearpumpPipelineTranslator.java    |  2 +-
 .../gearpump/translators/GroupByKeyTranslator.java | 18 ++++++------
 .../translators/ParDoMultiOutputTranslator.java    |  4 +--
 .../translators/ReadBoundedTranslator.java         |  4 +--
 .../translators/ReadUnboundedTranslator.java       |  4 +--
 .../gearpump/translators/TranslationContext.java   |  8 +++---
 .../translators/WindowAssignTranslator.java        |  4 +--
 .../translators/functions/DoFnFunction.java        | 11 ++++++--
 .../gearpump/translators/io/GearpumpSource.java    | 10 +++----
 .../translators/utils/TranslatorUtils.java         | 15 ++++++----
 .../beam/runners/gearpump/PipelineOptionsTest.java |  7 ++---
 ...reateGearpumpPCollectionViewTranslatorTest.java |  2 +-
 .../FlattenPCollectionsTranslatorTest.java         |  6 ++--
 .../translators/GroupByKeyTranslatorTest.java      |  4 +--
 .../translators/ReadBoundedTranslatorTest.java     |  4 +--
 .../translators/ReadUnboundedTranslatorTest.java   |  4 +--
 .../translators/io/GearpumpSourceTest.java         |  6 ++--
 .../gearpump/translators/io/ValueSoureTest.java    | 10 ++-----
 .../translators/utils/TranslatorUtilsTest.java     |  2 +-
 26 files changed, 110 insertions(+), 112 deletions(-)

diff --git a/runners/gearpump/build.gradle b/runners/gearpump/build.gradle
index 3a0ceb1..bd1efa9 100644
--- a/runners/gearpump/build.gradle
+++ b/runners/gearpump/build.gradle
@@ -30,7 +30,7 @@ description = "Apache Beam :: Runners :: Gearpump"
  */
 evaluationDependsOn(":beam-sdks-java-core")
 
-def gearpump_version = "0.8.4"
+def gearpump_version = "0.9.0"
 
 configurations {
   validatesRunner
@@ -39,12 +39,12 @@ configurations {
 dependencies {
   compile library.java.guava
   compileOnly "com.typesafe:config:1.3.0"
-  compileOnly "org.scala-lang:scala-library:2.11.8"
+  compileOnly "org.scala-lang:scala-library:2.12.7"
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: "shadow")
-  shadow "org.apache.gearpump:gearpump-core_2.11:$gearpump_version"
-  shadow "org.apache.gearpump:gearpump-streaming_2.11:$gearpump_version"
+  shadow "io.github.gearpump:gearpump-core_2.12:$gearpump_version:assembly"
+  shadow "io.github.gearpump:gearpump-streaming_2.12:$gearpump_version:assembly"
   shadow library.java.joda_time
   shadow library.java.jackson_annotations
   shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
index aaf3f1e..eadfc44 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineOptions.java
@@ -18,12 +18,12 @@
 package org.apache.beam.runners.gearpump;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import io.gearpump.cluster.client.ClientContext;
 import java.util.Map;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+
 
 /** Options that configure the Gearpump pipeline. */
 public interface GearpumpPipelineOptions extends PipelineOptions {
@@ -45,11 +45,11 @@ public interface GearpumpPipelineOptions extends PipelineOptions {
   @JsonIgnore
   Map<String, String> getSerializers();
 
-  @Description("set EmbeddedCluster for tests")
-  void setEmbeddedCluster(EmbeddedCluster cluster);
+  @Description("Whether the pipeline will be run on a remote cluster. If false, it will be run on a EmbeddedCluster")
+  void setRemote(Boolean remote);
 
-  @JsonIgnore
-  EmbeddedCluster getEmbeddedCluster();
+  @Default.Boolean(true)
+  Boolean getRemote();
 
   void setClientContext(ClientContext clientContext);
 
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 92d0b94..7f9b0be 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -1,4 +1,4 @@
-/*
+  /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,15 +17,15 @@
  */
 package org.apache.beam.runners.gearpump;
 
+import io.gearpump.cluster.ApplicationStatus;
+import io.gearpump.cluster.MasterToAppMaster.AppMasterData;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.cluster.client.RunningApplication;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.gearpump.cluster.ApplicationStatus;
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.client.RunningApplication;
 import org.joda.time.Duration;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
@@ -82,6 +82,10 @@ public class GearpumpPipelineResult implements PipelineResult {
         String.format("%s does not support querying metrics", getClass().getSimpleName()));
   }
 
+  public ClientContext getClientContext() {
+    return client;
+  }
+
   private State getGearpumpState() {
     ApplicationStatus status = null;
     List<AppMasterData> apps =
@@ -92,11 +96,11 @@ public class GearpumpPipelineResult implements PipelineResult {
         status = appData.status();
       }
     }
-    if (null == status || status instanceof ApplicationStatus.NONEXIST$) {
+    if (null == status || status.status().equals("nonexist")) {
       return State.UNKNOWN;
-    } else if (status instanceof ApplicationStatus.ACTIVE$) {
+    } else if (status.status().equals("active")) {
       return State.RUNNING;
-    } else if (status instanceof ApplicationStatus.SUCCEEDED$) {
+    } else if (status.status().equals("succeeded")) {
       return State.DONE;
     } else {
       return State.FAILED;
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index fb24101..3181b9c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -19,6 +19,15 @@ package org.apache.beam.runners.gearpump;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.cluster.client.ClientContext;
+import io.gearpump.cluster.client.RemoteRuntimeEnvironment;
+import io.gearpump.cluster.client.RunningApplication;
+import io.gearpump.cluster.client.RuntimeEnvironment;
+import io.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.util.Constants;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.gearpump.translators.GearpumpPipelineTranslator;
@@ -27,12 +36,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.client.RunningApplication;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
 
 /**
  * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them
@@ -63,7 +66,13 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
       appName = DEFAULT_APPNAME;
     }
     Config config = registerSerializers(ClusterConfig.defaultConfig(), options.getSerializers());
-    ClientContext clientContext = getClientContext(options, config);
+    if (options.getRemote()) {
+      RuntimeEnvironment.setRuntimeEnv(new RemoteRuntimeEnvironment());
+    } else {
+      RuntimeEnvironment.setRuntimeEnv(new EmbeddedRuntimeEnvironment());
+      config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
+    }
+    ClientContext clientContext = ClientContext.apply(config);
     options.setClientContext(clientContext);
     UserConfig userConfig = UserConfig.empty();
     JavaStreamApp streamApp = new JavaStreamApp(appName, clientContext, userConfig);
@@ -75,15 +84,6 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     return new GearpumpPipelineResult(clientContext, app);
   }
 
-  private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
-    EmbeddedCluster cluster = options.getEmbeddedCluster();
-    if (cluster != null) {
-      return cluster.newClientContext();
-    } else {
-      return ClientContext.apply(config);
-    }
-  }
-
   /** register class with default kryo serializers. */
   private Config registerSerializers(Config config, Map<String, String> userSerializers) {
     Map<String, String> serializers = new HashMap<>();
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index a2ddc98..d22be2f 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -17,29 +17,19 @@
  */
 package org.apache.beam.runners.gearpump;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.client.ClientContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.util.Constants;
 
-/** Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}. */
+/** Gearpump {@link PipelineRunner} for tests, which uses EmbeddedCluster. */
 public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
   private final GearpumpRunner delegate;
-  private final EmbeddedCluster cluster;
 
   private TestGearpumpRunner(GearpumpPipelineOptions options) {
-    Config config = ClusterConfig.master(null);
-    config =
-        config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
-    cluster = new EmbeddedCluster(config);
-    cluster.start();
-    options.setEmbeddedCluster(cluster);
+    options.setRemote(false);
     delegate = GearpumpRunner.fromOptions(options);
   }
 
@@ -53,7 +43,8 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
   public GearpumpPipelineResult run(Pipeline pipeline) {
     GearpumpPipelineResult result = delegate.run(pipeline);
     result.waitUntilFinish();
-    cluster.stop();
+    ClientContext client = result.getClientContext();
+    client.close();
     return result;
   }
 }
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index 64d1752..42bce72 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.runners.gearpump.translators;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
 import java.util.List;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
 /** CreateGearpumpPCollectionView bridges input stream to down stream transforms. */
 public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT>
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index a5ef23f..bc81ea1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
@@ -26,8 +28,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
 /** Flatten.FlattenPCollectionList is translated to Gearpump merge function. */
 public class FlattenPCollectionsTranslator<T>
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
index 7f76223..a7ba477 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.ImmutableList;
+import io.gearpump.util.Graph;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.util.Graph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 55def4b..52e0158 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -19,6 +19,15 @@ package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.FoldFunction;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import io.gearpump.streaming.dsl.window.api.Discarding$;
+import io.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
+import io.gearpump.streaming.dsl.window.api.WindowFunction;
+import io.gearpump.streaming.dsl.window.api.Windows;
+import io.gearpump.streaming.dsl.window.impl.Window;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -36,15 +45,6 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
-import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
-import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
-import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
-import org.apache.gearpump.streaming.dsl.window.api.Windows;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
 import org.joda.time.Instant;
 
 /** {@link GroupByKey} is translated to Gearpump groupBy function. */
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
index c076cec..09172e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.gearpump.translators;
 
+import io.gearpump.streaming.dsl.api.functions.FilterFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,8 +34,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
 /**
  * {@link ParDo.MultiOutput} is translated to Gearpump flatMap function with {@link DoFn} wrapped in
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
index 546f354..f508f05 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.gearpump.translators;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
 import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
 
 /**
  * {@link Read.Bounded} is translated to Gearpump source function and {@link BoundedSource} is
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
index d082ae3..b508539 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.gearpump.translators;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
 import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
 
 /**
  * {@link Read.Unbounded} is translated to Gearpump source function and {@link UnboundedSource} is
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b3a962d..840ed2e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,6 +20,10 @@ package org.apache.beam.runners.gearpump.translators;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.Iterables;
+import io.gearpump.cluster.UserConfig;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import io.gearpump.streaming.source.DataSource;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.TransformInputs;
@@ -29,10 +33,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-import org.apache.gearpump.streaming.source.DataSource;
 
 /** Maintains context data for {@link TransformTranslator}s. */
 @SuppressWarnings({"rawtypes", "unchecked"})
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 44dbe82..6754d83 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.Iterables;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -28,8 +30,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.joda.time.Instant;
 
 /** {@link Window.Assign} is translated to Gearpump flatMap function. */
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 194be4b..1df2b15 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.gearpump.translators.functions;
 
 import com.google.common.collect.Iterables;
+import io.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /** Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}. */
 @SuppressWarnings("unchecked")
@@ -96,7 +96,10 @@ public class DoFnFunction<InputT, OutputT>
   public void setup() {
     sideInputReader = new SideInputHandler(sideInputs, InMemoryStateInternals.<Void>forKey(null));
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
-    doFnInvoker.invokeSetup();
+
+    if (doFnInvoker != null) {
+      doFnInvoker.invokeSetup();
+    }
 
     doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
 
@@ -106,7 +109,9 @@ public class DoFnFunction<InputT, OutputT>
 
   @Override
   public void teardown() {
-    doFnInvoker.invokeTeardown();
+    if (doFnInvoker != null) {
+      doFnInvoker.invokeTeardown();
+    }
   }
 
   @Override
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 5da447f..aaa6184 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -17,6 +17,11 @@
  */
 package org.apache.beam.runners.gearpump.translators.io;
 
+import io.gearpump.DefaultMessage;
+import io.gearpump.Message;
+import io.gearpump.streaming.source.DataSource;
+import io.gearpump.streaming.source.Watermark;
+import io.gearpump.streaming.task.TaskContext;
 import java.io.IOException;
 import java.time.Instant;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
@@ -26,11 +31,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.gearpump.DefaultMessage;
-import org.apache.gearpump.Message;
-import org.apache.gearpump.streaming.source.DataSource;
-import org.apache.gearpump.streaming.source.Watermark;
-import org.apache.gearpump.streaming.task.TaskContext;
 
 /** common methods for {@link BoundedSourceWrapper} and {@link UnboundedSourceWrapper}. */
 public abstract class GearpumpSource<T> implements DataSource {
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index f826e4a..08345e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -18,6 +18,10 @@
 package org.apache.beam.runners.gearpump.translators.utils;
 
 import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.api.functions.FoldFunction;
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.dsl.window.impl.Window;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
@@ -29,10 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
 
 /** Utility methods for translators. */
 public class TranslatorUtils {
@@ -143,7 +143,12 @@ public class TranslatorUtils {
     private final String unionTag;
     private final Object value;
 
-    /** Constructs a partial union from the given union tag and value. */
+    /**
+     * Constructs a partial union from the given union tag and value.
+     *
+     * @param unionTag tag of union
+     * @param value value of union
+     */
     public RawUnionValue(String unionTag, Object value) {
       this.unionTag = unionTag;
       this.value = value;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
index 801a2ea..7e2c5be 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/PipelineOptionsTest.java
@@ -23,13 +23,12 @@ import static org.junit.Assert.assertNull;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import io.gearpump.cluster.ClusterConfig;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
 import org.junit.Test;
 
 /** Tests for {@link GearpumpPipelineOptions}. */
@@ -43,10 +42,9 @@ public class PipelineOptionsTest {
     GearpumpPipelineOptions options =
         PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
     Config config = ClusterConfig.master(null);
-    EmbeddedCluster cluster = new EmbeddedCluster(config);
     options.setSerializers(serializers);
     options.setApplicationName(appName);
-    options.setEmbeddedCluster(cluster);
+    options.setRemote(false);
     options.setParallelism(10);
 
     byte[] serializedOptions = serialize(options);
@@ -55,7 +53,6 @@ public class PipelineOptionsTest {
             .readValue(serializedOptions, PipelineOptions.class)
             .as(GearpumpPipelineOptions.class);
 
-    assertNull(deserializedOptions.getEmbeddedCluster());
     assertNull(deserializedOptions.getSerializers());
     assertEquals(10, deserializedOptions.getParallelism());
     assertEquals(appName, deserializedOptions.getApplicationName());
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
index 7ba8bb1..b580a24 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -22,9 +22,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.junit.Test;
 
 /** Tests for {@link CreateGearpumpPCollectionViewTranslator}. */
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index 39e51cb..13b8544 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -25,6 +25,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import io.gearpump.streaming.dsl.api.functions.MapFunction;
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,9 +38,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 2abcc25..e80d671 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.window.api.WindowFunction;
+import io.gearpump.streaming.dsl.window.impl.Window;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.List;
@@ -35,8 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
 import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
index 3e41bc4..b09d096 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
@@ -23,14 +23,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
index 7a5f7ad..b828f9c 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
@@ -23,14 +23,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import io.gearpump.streaming.dsl.javaapi.JavaStream;
+import io.gearpump.streaming.source.DataSource;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.source.DataSource;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index d2a4540..619a478 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -18,6 +18,9 @@
 package org.apache.beam.runners.gearpump.translators.io;
 
 import com.google.common.collect.Lists;
+import io.gearpump.DefaultMessage;
+import io.gearpump.Message;
+import io.gearpump.streaming.source.Watermark;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
@@ -30,9 +33,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.gearpump.DefaultMessage;
-import org.apache.gearpump.Message;
-import org.apache.gearpump.streaming.source.Watermark;
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
index e63ba78..b08d547 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -21,6 +21,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
+import io.gearpump.cluster.ClusterConfig;
+import io.gearpump.util.Constants;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -33,9 +35,6 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.util.Constants;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -49,10 +48,8 @@ public class ValueSoureTest {
     Config config = ClusterConfig.master(null);
     config =
         config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
-    EmbeddedCluster cluster = new EmbeddedCluster(config);
-    cluster.start();
 
-    options.setEmbeddedCluster(cluster);
+    options.setRemote(false);
     options.setRunner(GearpumpRunner.class);
     options.setParallelism(1);
     Pipeline p = Pipeline.create(options);
@@ -61,7 +58,6 @@ public class ValueSoureTest {
     p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector()));
 
     p.run().waitUntilFinish();
-    cluster.stop();
 
     Assert.assertEquals(Sets.newHashSet(values), ResultCollector.RESULTS);
   }
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 0f661ef..7932a8c 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -21,13 +21,13 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
+import io.gearpump.streaming.dsl.window.impl.Window;
 import java.time.Instant;
 import java.util.List;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.values.KV;
-import org.apache.gearpump.streaming.dsl.window.impl.Window;
 import org.junit.Test;
 
 /** Tests for {@link TranslatorUtils}. */