You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/12/07 19:23:26 UTC

[beam] branch master updated: [BEAM-6194] Follow up with cleanup for https://github.com/apache/beam/pull/7015 (#7219)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a21b196  [BEAM-6194] Follow up with cleanup for https://github.com/apache/beam/pull/7015 (#7219)
a21b196 is described below

commit a21b1965ca37b885cca6d6d2c0a94690daafc9ef
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 7 11:23:19 2018 -0800

    [BEAM-6194] Follow up with cleanup for https://github.com/apache/beam/pull/7015 (#7219)
---
 .../runners/direct/portable/ReferenceRunner.java     | 13 ++++++-------
 .../portable/RemoteStageEvaluatorFactoryTest.java    |  3 ++-
 .../worker/BeamFnMapTaskExecutorFactory.java         | 20 --------------------
 .../fn/control/ProcessRemoteBundleOperation.java     |  5 ++---
 .../graph/CreateExecutableStageNodeFunction.java     |  5 +++--
 .../worker/IntrinsicMapTaskExecutorFactoryTest.java  |  6 +++---
 .../SingleEnvironmentInstanceJobBundleFactory.java   |  7 +------
 .../environment/StaticRemoteEnvironment.java         | 16 +++++++---------
 .../environment/StaticRemoteEnvironmentFactory.java  | 12 +++++-------
 ...ingleEnvironmentInstanceJobBundleFactoryTest.java |  3 ++-
 10 files changed, 31 insertions(+), 59 deletions(-)

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 1d1c56c..e532355 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -82,6 +82,7 @@ import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
 import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -97,6 +98,8 @@ public class ReferenceRunner {
 
   private final EnvironmentType environmentType;
 
+  private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
   private ReferenceRunner(
       Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType environmentType) {
     this.pipeline = executable(p);
@@ -197,7 +200,8 @@ public class ReferenceRunner {
       EnvironmentFactory environmentFactory =
           createEnvironmentFactory(control, logging, artifact, provisioning, controlClientPool);
       JobBundleFactory jobBundleFactory =
-          SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, data, state, null);
+          SingleEnvironmentInstanceJobBundleFactory.create(
+              environmentFactory, data, state, idGenerator);
 
       TransformEvaluatorRegistry transformRegistry =
           TransformEvaluatorRegistry.portableRegistry(
@@ -238,12 +242,7 @@ public class ReferenceRunner {
       case DOCKER:
         return new DockerEnvironmentFactory.Provider(PipelineOptionsTranslation.fromProto(options))
             .createEnvironmentFactory(
-                control,
-                logging,
-                artifact,
-                provisioning,
-                controlClient,
-                IdGenerators.incrementingLongs());
+                control, logging, artifact, provisioning, controlClient, idGenerator);
       case IN_PROCESS:
         return EmbeddedEnvironmentFactory.create(
             PipelineOptionsFactory.create(), logging, control, controlClient.getSource());
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index 2dfaac9..d90d66d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -50,6 +50,7 @@ import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -111,7 +112,7 @@ public class RemoteStageEvaluatorFactoryTest implements Serializable {
     bundleFactory = ImmutableListBundleFactory.create();
     JobBundleFactory jobBundleFactory =
         SingleEnvironmentInstanceJobBundleFactory.create(
-            environmentFactory, dataServer, stateServer, null);
+            environmentFactory, dataServer, stateServer, IdGenerators.incrementingLongs());
     factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7db5339..78bba56 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -148,26 +148,6 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
     Networks.replaceDirectedNetworkNodes(
         network, createOutputReceiversTransform(stageName, counterSet));
 
-    // Swap out all the RegisterFnRequest nodes with Operation nodes
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForRegisterFnNodes(
-            idGenerator,
-            instructionRequestHandler,
-            grpcStateFnServer.getService(),
-            stageName,
-            executionContext));
-
-    // Swap out all the RemoteGrpcPort nodes with Operation nodes, note that it is expected
-    // that the RegisterFnRequest nodes have already been replaced.
-    Networks.replaceDirectedNetworkNodes(
-        network,
-        createOperationTransformForGrpcPortNodes(
-            network,
-            grpcDataFnServer.getService(),
-            // TODO: Set NameContext properly for these operations.
-            executionContext.createOperationContext(
-                NameContext.create(stageName, stageName, stageName, stageName))));
     if (DataflowRunner.hasExperiment(
         options.as(DataflowPipelineDebugOptions.class), "use_executable_stage_bundle_execution")) {
       LOG.debug("Using SingleEnvironmentInstanceJobBundleFactory");
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index 3d44331..ae8a13e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -35,9 +35,8 @@ import org.slf4j.LoggerFactory;
 /**
  * This {@link org.apache.beam.runners.dataflow.worker.util.common.worker.Operation} is responsible
  * for communicating with the SDK harness and asking it to process a bundle of work. This operation
- * request a RemoteBundle{@link org.apache.beam.runners.fnexecution.control.RemoteBundle}, send data
- * elements to SDK and receive processed results from SDK, then pass these elements to next
- * Operations.
+ * requests a {@link org.apache.beam.runners.fnexecution.control.RemoteBundle}, sends elements to
+ * SDK and receive processed results from SDK, passing these elements downstream.
  */
 public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
   private static final Logger LOG = LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ab8cd0b..c93e9e9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -82,8 +82,9 @@ import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Converts a {@link Network} representation of {@link MapTask} destined for the SDK harness into an
- * {@link Node} containing {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ * Converts a {@link Network} representation of {@link MapTask} destined for the SDK harness into a
+ * {@link Node} containing an {@link
+ * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
  */
 public class CreateExecutableStageNodeFunction
     implements Function<MutableNetwork<Node, Edge>, Node> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 6b12b21..182ad50 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -173,9 +173,9 @@ public class IntrinsicMapTaskExecutorFactoryTest {
     try (DataflowMapTaskExecutor executor =
         mapTaskExecutorFactory.create(
             null /* beamFnControlClientHandler */,
-            null /* beamFnDataService */,
-            null /* beamFnStateService */,
-            null,
+            null /* GrpcFnServer<GrpcDataService> */,
+            null /* ApiServiceDescriptor */,
+            null, /* GrpcFnServer<GrpcStateService> */
             mapTaskToNetwork.apply(mapTask),
             options,
             STAGE,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 23b752b..b7d0c58 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -78,11 +77,7 @@ public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFacto
     this.environmentFactory = environmentFactory;
     this.dataService = dataService;
     this.stateService = stateService;
-    if (idGenerator != null) {
-      this.idGenerator = idGenerator;
-    } else {
-      this.idGenerator = IdGenerators.incrementingLongs();
-    }
+    this.idGenerator = idGenerator;
   }
 
   @Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
index 655a2e7..79c68ab 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
@@ -30,7 +30,6 @@ public class StaticRemoteEnvironment implements RemoteEnvironment {
     return new StaticRemoteEnvironment(environment, instructionRequestHandler);
   }
 
-  private final Object lock = new Object();
   private final Environment environment;
   private final InstructionRequestHandler instructionRequestHandler;
 
@@ -53,14 +52,13 @@ public class StaticRemoteEnvironment implements RemoteEnvironment {
   }
 
   @Override
-  public void close() throws Exception {
-    synchronized (lock) {
-      // The running docker container and instruction handler should each only be terminated once.
-      // Do nothing if we have already requested termination.
-      if (!isClosed) {
-        isClosed = true;
-        this.instructionRequestHandler.close();
-      }
+  public synchronized void close() throws Exception {
+    // The instruction handler should each only be terminated once.
+    // Do nothing if we have already requested termination.
+    if (!isClosed) {
+      return;
     }
+    isClosed = true;
+    this.instructionRequestHandler.close();
   }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
index 5b35462..29a0721 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
@@ -28,20 +28,18 @@ import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionServi
 import org.apache.beam.sdk.fn.IdGenerator;
 
 /**
- * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by Dataflow runner
- * harness.
+ * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by a runner harness that
+ * would like to use an existing InstructionRequestHandler.
  */
 public class StaticRemoteEnvironmentFactory implements EnvironmentFactory {
   public static StaticRemoteEnvironmentFactory forService(
       InstructionRequestHandler instructionRequestHandler) {
-    StaticRemoteEnvironmentFactory factory = new StaticRemoteEnvironmentFactory();
-    factory.setStaticServiceContent(instructionRequestHandler);
-    return factory;
+    return new StaticRemoteEnvironmentFactory(instructionRequestHandler);
   }
 
-  private InstructionRequestHandler instructionRequestHandler;
+  private final InstructionRequestHandler instructionRequestHandler;
 
-  private void setStaticServiceContent(InstructionRequestHandler instructionRequestHandler) {
+  private StaticRemoteEnvironmentFactory(InstructionRequestHandler instructionRequestHandler) {
     this.instructionRequestHandler = instructionRequestHandler;
   }
 
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 30e1bfe..a795e3c 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.junit.After;
@@ -84,7 +85,7 @@ public class SingleEnvironmentInstanceJobBundleFactoryTest {
 
     factory =
         SingleEnvironmentInstanceJobBundleFactory.create(
-            environmentFactory, dataServer, stateServer, null);
+            environmentFactory, dataServer, stateServer, IdGenerators.incrementingLongs());
   }
 
   @After