You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/12/07 19:23:26 UTC
[beam] branch master updated: [BEAM-6194] Follow up with cleanup
for https://github.com/apache/beam/pull/7015 (#7219)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a21b196 [BEAM-6194] Follow up with cleanup for https://github.com/apache/beam/pull/7015 (#7219)
a21b196 is described below
commit a21b1965ca37b885cca6d6d2c0a94690daafc9ef
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Fri Dec 7 11:23:19 2018 -0800
[BEAM-6194] Follow up with cleanup for https://github.com/apache/beam/pull/7015 (#7219)
---
.../runners/direct/portable/ReferenceRunner.java | 13 ++++++-------
.../portable/RemoteStageEvaluatorFactoryTest.java | 3 ++-
.../worker/BeamFnMapTaskExecutorFactory.java | 20 --------------------
.../fn/control/ProcessRemoteBundleOperation.java | 5 ++---
.../graph/CreateExecutableStageNodeFunction.java | 5 +++--
.../worker/IntrinsicMapTaskExecutorFactoryTest.java | 6 +++---
.../SingleEnvironmentInstanceJobBundleFactory.java | 7 +------
.../environment/StaticRemoteEnvironment.java | 16 +++++++---------
.../environment/StaticRemoteEnvironmentFactory.java | 12 +++++-------
...ingleEnvironmentInstanceJobBundleFactoryTest.java | 3 ++-
10 files changed, 31 insertions(+), 59 deletions(-)
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 1d1c56c..e532355 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -82,6 +82,7 @@ import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
+import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -97,6 +98,8 @@ public class ReferenceRunner {
private final EnvironmentType environmentType;
+ private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
private ReferenceRunner(
Pipeline p, Struct options, @Nullable File artifactsDir, EnvironmentType environmentType) {
this.pipeline = executable(p);
@@ -197,7 +200,8 @@ public class ReferenceRunner {
EnvironmentFactory environmentFactory =
createEnvironmentFactory(control, logging, artifact, provisioning, controlClientPool);
JobBundleFactory jobBundleFactory =
- SingleEnvironmentInstanceJobBundleFactory.create(environmentFactory, data, state, null);
+ SingleEnvironmentInstanceJobBundleFactory.create(
+ environmentFactory, data, state, idGenerator);
TransformEvaluatorRegistry transformRegistry =
TransformEvaluatorRegistry.portableRegistry(
@@ -238,12 +242,7 @@ public class ReferenceRunner {
case DOCKER:
return new DockerEnvironmentFactory.Provider(PipelineOptionsTranslation.fromProto(options))
.createEnvironmentFactory(
- control,
- logging,
- artifact,
- provisioning,
- controlClient,
- IdGenerators.incrementingLongs());
+ control, logging, artifact, provisioning, controlClient, idGenerator);
case IN_PROCESS:
return EmbeddedEnvironmentFactory.create(
PipelineOptionsFactory.create(), logging, control, controlClient.getSource());
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index 2dfaac9..d90d66d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -50,6 +50,7 @@ import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
@@ -111,7 +112,7 @@ public class RemoteStageEvaluatorFactoryTest implements Serializable {
bundleFactory = ImmutableListBundleFactory.create();
JobBundleFactory jobBundleFactory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer, null);
+ environmentFactory, dataServer, stateServer, IdGenerators.incrementingLongs());
factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7db5339..78bba56 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -148,26 +148,6 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));
- // Swap out all the RegisterFnRequest nodes with Operation nodes
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForRegisterFnNodes(
- idGenerator,
- instructionRequestHandler,
- grpcStateFnServer.getService(),
- stageName,
- executionContext));
-
- // Swap out all the RemoteGrpcPort nodes with Operation nodes, note that it is expected
- // that the RegisterFnRequest nodes have already been replaced.
- Networks.replaceDirectedNetworkNodes(
- network,
- createOperationTransformForGrpcPortNodes(
- network,
- grpcDataFnServer.getService(),
- // TODO: Set NameContext properly for these operations.
- executionContext.createOperationContext(
- NameContext.create(stageName, stageName, stageName, stageName))));
if (DataflowRunner.hasExperiment(
options.as(DataflowPipelineDebugOptions.class), "use_executable_stage_bundle_execution")) {
LOG.debug("Using SingleEnvironmentInstanceJobBundleFactory");
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index 3d44331..ae8a13e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -35,9 +35,8 @@ import org.slf4j.LoggerFactory;
/**
* This {@link org.apache.beam.runners.dataflow.worker.util.common.worker.Operation} is responsible
* for communicating with the SDK harness and asking it to process a bundle of work. This operation
- * request a RemoteBundle{@link org.apache.beam.runners.fnexecution.control.RemoteBundle}, send data
- * elements to SDK and receive processed results from SDK, then pass these elements to next
- * Operations.
+ * requests a {@link org.apache.beam.runners.fnexecution.control.RemoteBundle}, sends elements to
+ * SDK and receive processed results from SDK, passing these elements downstream.
*/
public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
private static final Logger LOG = LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ab8cd0b..c93e9e9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -82,8 +82,9 @@ import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.InvalidProtocolBufferException;
/**
- * Converts a {@link Network} representation of {@link MapTask} destined for the SDK harness into an
- * {@link Node} containing {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ * Converts a {@link Network} representation of {@link MapTask} destined for the SDK harness into a
+ * {@link Node} containing an {@link
+ * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
*/
public class CreateExecutableStageNodeFunction
implements Function<MutableNetwork<Node, Edge>, Node> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 6b12b21..182ad50 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -173,9 +173,9 @@ public class IntrinsicMapTaskExecutorFactoryTest {
try (DataflowMapTaskExecutor executor =
mapTaskExecutorFactory.create(
null /* beamFnControlClientHandler */,
- null /* beamFnDataService */,
- null /* beamFnStateService */,
- null,
+ null /* GrpcFnServer<GrpcDataService> */,
+ null /* ApiServiceDescriptor */,
+ null, /* GrpcFnServer<GrpcStateService> */
mapTaskToNetwork.apply(mapTask),
options,
STAGE,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 23b752b..b7d0c58 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
@@ -78,11 +77,7 @@ public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFacto
this.environmentFactory = environmentFactory;
this.dataService = dataService;
this.stateService = stateService;
- if (idGenerator != null) {
- this.idGenerator = idGenerator;
- } else {
- this.idGenerator = IdGenerators.incrementingLongs();
- }
+ this.idGenerator = idGenerator;
}
@Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
index 655a2e7..79c68ab 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironment.java
@@ -30,7 +30,6 @@ public class StaticRemoteEnvironment implements RemoteEnvironment {
return new StaticRemoteEnvironment(environment, instructionRequestHandler);
}
- private final Object lock = new Object();
private final Environment environment;
private final InstructionRequestHandler instructionRequestHandler;
@@ -53,14 +52,13 @@ public class StaticRemoteEnvironment implements RemoteEnvironment {
}
@Override
- public void close() throws Exception {
- synchronized (lock) {
- // The running docker container and instruction handler should each only be terminated once.
- // Do nothing if we have already requested termination.
- if (!isClosed) {
- isClosed = true;
- this.instructionRequestHandler.close();
- }
+ public synchronized void close() throws Exception {
+ // The instruction handler should each only be terminated once.
+ // Do nothing if we have already requested termination.
+ if (!isClosed) {
+ return;
}
+ isClosed = true;
+ this.instructionRequestHandler.close();
}
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
index 5b35462..29a0721 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/StaticRemoteEnvironmentFactory.java
@@ -28,20 +28,18 @@ import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionServi
import org.apache.beam.sdk.fn.IdGenerator;
/**
- * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by Dataflow runner
- * harness.
+ * An {@link EnvironmentFactory} that creates StaticRemoteEnvironment used by a runner harness that
+ * would like to use an existing InstructionRequestHandler.
*/
public class StaticRemoteEnvironmentFactory implements EnvironmentFactory {
public static StaticRemoteEnvironmentFactory forService(
InstructionRequestHandler instructionRequestHandler) {
- StaticRemoteEnvironmentFactory factory = new StaticRemoteEnvironmentFactory();
- factory.setStaticServiceContent(instructionRequestHandler);
- return factory;
+ return new StaticRemoteEnvironmentFactory(instructionRequestHandler);
}
- private InstructionRequestHandler instructionRequestHandler;
+ private final InstructionRequestHandler instructionRequestHandler;
- private void setStaticServiceContent(InstructionRequestHandler instructionRequestHandler) {
+ private StaticRemoteEnvironmentFactory(InstructionRequestHandler instructionRequestHandler) {
this.instructionRequestHandler = instructionRequestHandler;
}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 30e1bfe..a795e3c 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.transforms.Create;
import org.junit.After;
@@ -84,7 +85,7 @@ public class SingleEnvironmentInstanceJobBundleFactoryTest {
factory =
SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory, dataServer, stateServer, null);
+ environmentFactory, dataServer, stateServer, IdGenerators.incrementingLongs());
}
@After