You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2019/03/12 18:56:22 UTC
[beam] branch master updated: [BEAM-6504] Connect SideInputHandler
in dataflow runner (Part 2) (#7926)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8502b36 [BEAM-6504] Connect SideInputHandler in dataflow runner (Part 2) (#7926)
8502b36 is described below
commit 8502b368e2d3806499091d5bc4d7066f12729843
Author: Ruoyun Huang <hu...@gmail.com>
AuthorDate: Tue Mar 12 11:55:53 2019 -0700
[BEAM-6504] Connect SideInputHandler in dataflow runner (Part 2) (#7926)
* Connect SideInputHandler in dataflow runner.
* Remove unused LOG info
* Remove redundant `include` line
---
runners/google-cloud-dataflow-java/build.gradle | 4 -
.../worker/BeamFnMapTaskExecutorFactory.java | 85 +++++++++++++++++++++-
.../fn/control/ProcessRemoteBundleOperation.java | 40 +++++++++-
.../graph/CreateExecutableStageNodeFunction.java | 65 ++++++++++++++++-
.../beam/runners/dataflow/worker/graph/Nodes.java | 13 +++-
.../SingleEnvironmentInstanceJobBundleFactory.java | 5 +-
6 files changed, 196 insertions(+), 16 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index edba36d..fa99117 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -264,8 +264,6 @@ task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
maxParallelForks Integer.MAX_VALUE
classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
- // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution.
- exclude '**/ViewTest.class'
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
commonExcludeCategories.each {
@@ -274,8 +272,6 @@ task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
fnApiWorkerExcludeCategories.each {
excludeCategories it
}
- // TODO(BEAM-6232): Support sideinput.
- excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
// TODO(BEAM-6233): Support timer and state.
excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 8692e6a..f54c207 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.function.Function;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.ElementByteSizeObservable;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -347,16 +348,42 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
outputReceiverNode.getPcollectionId(),
outputReceiverNode.getOutputReceiver()));
- DataflowOperationContext operationContext =
+ DataflowOperationContext operationContextStage =
executionContext.createOperationContext(
NameContext.create(stageName, stageName, stageName, stageName));
-
TimerReceiver timerReceiver =
new TimerReceiver(
input.getExecutableStage().getComponents(),
- executionContext.getStepContext(operationContext).namespacedToUser(),
+ executionContext.getStepContext(operationContextStage).namespacedToUser(),
stageBundleFactory);
+ ImmutableMap.Builder<String, DataflowOperationContext>
+ ptransformIdToOperationContextBuilder = ImmutableMap.builder();
+
+ for (Map.Entry<String, NameContext> entry :
+ input.getPTransformIdToPartialNameContextMap().entrySet()) {
+ NameContext fullNameContext =
+ NameContext.create(
+ stageName,
+ entry.getValue().originalName(),
+ entry.getValue().systemName(),
+ entry.getValue().userName());
+
+ DataflowOperationContext operationContext =
+ executionContext.createOperationContext(fullNameContext);
+ ptransformIdToOperationContextBuilder.put(entry.getKey(), operationContext);
+ }
+
+ ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts =
+ ptransformIdToOperationContextBuilder.build();
+
+ ImmutableMap<String, SideInputReader> ptransformIdToSideInputReaders =
+ buildPTransformIdToSideInputReadersMap(
+ executionContext, input, ptransformIdToOperationContexts);
+
+ Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+ ptransformIdToSideInputIdToPCollectionView = buildSideInputIdToPCollectionView(input);
+
return OperationNode.create(
new ProcessRemoteBundleOperation(
input.getExecutableStage(),
@@ -364,7 +391,9 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
NameContext.create(stageName, stageName, stageName, stageName)),
stageBundleFactory,
outputReceiverMap,
- timerReceiver));
+ timerReceiver,
+ ptransformIdToSideInputReaders,
+ ptransformIdToSideInputIdToPCollectionView));
}
};
}
@@ -453,6 +482,33 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
return ptransformIdToSideInputReaders.build();
}
+ /** Returns a map from PTransform id to side input reader. */
+ private static ImmutableMap<String, SideInputReader> buildPTransformIdToSideInputReadersMap(
+ DataflowExecutionContext executionContext,
+ ExecutableStageNode registerRequestNode,
+ ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts) {
+
+ ImmutableMap.Builder<String, SideInputReader> ptransformIdToSideInputReaders =
+ ImmutableMap.builder();
+ for (Map.Entry<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionView :
+ registerRequestNode.getPTransformIdToPCollectionViewMap().entrySet()) {
+ try {
+ ptransformIdToSideInputReaders.put(
+ ptransformIdToPCollectionView.getKey(),
+ executionContext.getSideInputReader(
+ // Note that the side input infos will only be populated for a batch pipeline
+ registerRequestNode
+ .getPTransformIdToSideInputInfoMap()
+ .get(ptransformIdToPCollectionView.getKey()),
+ ptransformIdToPCollectionView.getValue(),
+ ptransformIdToOperationContexts.get(ptransformIdToPCollectionView.getKey())));
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ return ptransformIdToSideInputReaders.build();
+ }
+
/**
* Returns a table where the row key is the PTransform id, the column key is the side input id,
* and the value is the corresponding PCollectionView.
@@ -474,6 +530,27 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
return ptransformIdToSideInputIdToPCollectionViewBuilder.build();
}
+ /** Returns a map where key is the SideInput id, value is PCollectionView. */
+ private static Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+ buildSideInputIdToPCollectionView(ExecutableStageNode executableStageNode) {
+ ImmutableMap.Builder<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+ sideInputIdToPCollectionViewMapBuilder = ImmutableMap.builder();
+
+ for (Map.Entry<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews :
+ executableStageNode.getPTransformIdToPCollectionViewMap().entrySet()) {
+ for (PCollectionView<?> pCollectionView : ptransformIdToPCollectionViews.getValue()) {
+ sideInputIdToPCollectionViewMapBuilder.put(
+ RunnerApi.ExecutableStagePayload.SideInputId.newBuilder()
+ .setTransformId(ptransformIdToPCollectionViews.getKey())
+ .setLocalName(pCollectionView.getTagInternal().getId())
+ .build(),
+ pCollectionView);
+ }
+ }
+
+ return sideInputIdToPCollectionViewMapBuilder.build();
+ }
+
/**
* Creates an {@link Operation} from the given {@link ParallelInstruction} definition using the
* provided {@link ReaderFactory}.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index a0bcd6f..f9ff270 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -18,16 +18,23 @@
package org.apache.beam.runners.dataflow.worker.fn.control;
import java.io.Closeable;
+import java.io.IOException;
import java.util.*;
+import java.util.EnumMap;
import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
import org.apache.beam.runners.fnexecution.control.*;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,15 +68,44 @@ public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
DataflowOperationContext operationContext,
StageBundleFactory stageBundleFactory,
Map<String, OutputReceiver> outputReceiverMap,
- TimerReceiver timerReceiver) {
+ TimerReceiver timerReceiver,
+ Map<String, SideInputReader> ptransformIdToSideInputReader,
+ Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+ sideInputIdToPCollectionViewMap) {
super(EMPTY_RECEIVER_ARRAY, operationContext);
this.timerReceiver = timerReceiver;
this.stageBundleFactory = stageBundleFactory;
- this.stateRequestHandler = StateRequestHandler.unsupported();
this.progressHandler = BundleProgressHandler.ignored();
this.executableStage = executableStage;
this.outputReceiverMap = outputReceiverMap;
+
+ StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
+ DataflowSideInputHandlerFactory.of(
+ ptransformIdToSideInputReader, sideInputIdToPCollectionViewMap);
+
+ stateRequestHandler = getStateRequestHandler(executableStage, sideInputHandlerFactory);
+ }
+
+ private StateRequestHandler getStateRequestHandler(
+ ExecutableStage executableStage,
+ StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory) {
+ final StateRequestHandler sideInputHandler;
+
+ try {
+ sideInputHandler =
+ StateRequestHandlers.forSideInputHandlerFactory(
+ ProcessBundleDescriptors.getSideInputs(executableStage), sideInputHandlerFactory);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to setup state handler", e);
+ }
+
+ EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap =
+ new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(
+ BeamFnApi.StateKey.TypeCase.class);
+ handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputHandler);
+
+ return StateRequestHandlers.delegateBasedUponType(handlerMap);
}
@Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index a2cf3ab..a96ceac 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.graph;
import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.worker.graph.LengthPrefixUnknownCoders.forSideInputInfos;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -28,9 +29,11 @@ import com.google.api.services.dataflow.model.MultiOutputInfo;
import com.google.api.services.dataflow.model.ParDoInstruction;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.api.services.dataflow.model.ReadInstruction;
+import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,9 +76,11 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.graph.MutableNetwork;
@@ -189,6 +194,12 @@ public class CreateExecutableStageNodeFunction
Map<Node, String> nodesToPCollections = new HashMap<>();
ImmutableMap.Builder<String, NameContext> ptransformIdToNameContexts = ImmutableMap.builder();
+
+ ImmutableMap.Builder<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfos =
+ ImmutableMap.builder();
+ ImmutableMap.Builder<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews =
+ ImmutableMap.builder();
+
// A field of ExecutableStage which includes the PCollection goes to worker side.
Set<PCollectionNode> executableStageOutputs = new HashSet<>();
// A field of ExecutableStage which includes the PCollection goes to runner side.
@@ -279,12 +290,15 @@ public class CreateExecutableStageNodeFunction
}
componentsBuilder.putAllCoders(sdkComponents.toComponents().getCodersMap());
+
Set<PTransformNode> executableStageTransforms = new HashSet<>();
Set<TimerReference> executableStageTimers = new HashSet<>();
List<UserStateId> userStateIds = new ArrayList<>();
+ Set<SideInputReference> executableStageSideInputs = new HashSet<>();
for (ParallelInstructionNode node :
Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
+ ImmutableMap.Builder<String, PCollectionNode> sideInputIds = ImmutableMap.builder();
ParallelInstruction parallelInstruction = node.getParallelInstruction();
String ptransformId = "generatedPtransform" + idGenerator.getId();
ptransformIdToNameContexts.put(
@@ -306,6 +320,7 @@ public class CreateExecutableStageNodeFunction
if (userFnClassName.equals("CombineValuesFn") || userFnClassName.equals("KeyedCombineFn")) {
transformSpec = transformCombineValuesFnToFunctionSpec(userFnSpec);
+ ptransformIdToPCollectionViews.put(ptransformId, Collections.emptyList());
} else {
String parDoPTransformId = getString(userFnSpec, PropertyNames.SERIALIZED_FN);
@@ -347,6 +362,33 @@ public class CreateExecutableStageNodeFunction
userStateIds.add(builder.build());
}
+ // To facilitate the creation of Set executableStageSideInputs.
+ for (String sideInputTag : parDoPayload.getSideInputsMap().keySet()) {
+ String sideInputPCollectionId = parDoPTransform.getInputsOrThrow(sideInputTag);
+ RunnerApi.PCollection sideInputPCollection =
+ pipeline.getComponents().getPcollectionsOrThrow(sideInputPCollectionId);
+
+ pTransform.putInputs(sideInputTag, sideInputPCollectionId);
+
+ PCollectionNode pCollectionNode =
+ PipelineNode.pCollection(sideInputPCollectionId, sideInputPCollection);
+ sideInputIds.put(sideInputTag, pCollectionNode);
+ }
+
+ // To facilitate the creation of Map(ptransformId -> pCollectionView), which is
+ // required by constructing an ExecutableStageNode.
+ ImmutableList.Builder<PCollectionView<?>> pcollectionViews = ImmutableList.builder();
+ for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
+ parDoPayload.getSideInputsMap().entrySet()) {
+ pcollectionViews.add(
+ RegisterNodeFunction.transformSideInputForRunner(
+ pipeline,
+ parDoPTransform,
+ sideInputEntry.getKey(),
+ sideInputEntry.getValue()));
+ }
+ ptransformIdToPCollectionViews.put(ptransformId, pcollectionViews.build());
+
transformSpec
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.setPayload(parDoPayload.toByteString());
@@ -358,6 +400,11 @@ public class CreateExecutableStageNodeFunction
.setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
.setPayload(ByteString.copyFrom(userFnBytes));
}
+
+ if (parDoInstruction.getSideInputs() != null) {
+ ptransformIdToSideInputInfos.put(
+ ptransformId, forSideInputInfos(parDoInstruction.getSideInputs(), true));
+ }
}
} else if (parallelInstruction.getRead() != null) {
ReadInstruction readInstruction = parallelInstruction.getRead();
@@ -403,6 +450,16 @@ public class CreateExecutableStageNodeFunction
for (String timerId : timerIds) {
executableStageTimers.add(TimerReference.of(pTransformNode, timerId));
}
+
+ ImmutableMap<String, PCollectionNode> sideInputIdToPCollectionNodes = sideInputIds.build();
+ for (String sideInputTag : sideInputIdToPCollectionNodes.keySet()) {
+ SideInputReference sideInputReference =
+ SideInputReference.of(
+ pTransformNode, sideInputTag, sideInputIdToPCollectionNodes.get(sideInputTag));
+ executableStageSideInputs.add(sideInputReference);
+ }
+
+ executableStageTransforms.add(pTransformNode);
}
if (executableStageInputs.size() != 1) {
@@ -419,9 +476,7 @@ public class CreateExecutableStageNodeFunction
executableStageEnv = Environments.JAVA_SDK_HARNESS_ENVIRONMENT;
}
- Set<SideInputReference> executableStageSideInputs = new HashSet<>();
Set<UserStateReference> executableStageUserStateReference = new HashSet<>();
-
for (UserStateId userStateId : userStateIds) {
executableStageUserStateReference.add(
UserStateReference.fromUserStateId(userStateId, executableStageComponents));
@@ -437,7 +492,11 @@ public class CreateExecutableStageNodeFunction
executableStageTimers,
executableStageTransforms,
executableStageOutputs);
- return ExecutableStageNode.create(executableStage, ptransformIdToNameContexts.build());
+ return ExecutableStageNode.create(
+ executableStage,
+ ptransformIdToNameContexts.build(),
+ ptransformIdToSideInputInfos.build(),
+ ptransformIdToPCollectionViews.build());
}
private Environment getEnvironmentFromPTransform(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index e4f45dc..7a2d873 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -319,17 +319,26 @@ public class Nodes {
public abstract static class ExecutableStageNode extends Node {
public static ExecutableStageNode create(
ExecutableStage executableStage,
- Map<String, NameContext> ptransformIdToPartialNameContextMap) {
+ Map<String, NameContext> ptransformIdToPartialNameContextMap,
+ Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
+ Map<String, Iterable<PCollectionView<?>>> pTransformIdToPCollectionViewMap) {
checkNotNull(executableStage);
checkNotNull(ptransformIdToPartialNameContextMap);
return new AutoValue_Nodes_ExecutableStageNode(
- executableStage, ptransformIdToPartialNameContextMap);
+ executableStage,
+ ptransformIdToPartialNameContextMap,
+ ptransformIdToSideInputInfoMap,
+ pTransformIdToPCollectionViewMap);
}
public abstract ExecutableStage getExecutableStage();
public abstract Map<String, NameContext> getPTransformIdToPartialNameContextMap();
+ public abstract Map<String, Iterable<SideInputInfo>> getPTransformIdToSideInputInfoMap();
+
+ public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();
+
@Override
public String toString() {
// The request may be very large.
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 46dc8c3..38fcc93 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -104,7 +104,10 @@ public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFacto
try {
descriptor =
ProcessBundleDescriptors.fromExecutableStage(
- idGenerator.getId(), stage, dataService.getApiServiceDescriptor());
+ idGenerator.getId(),
+ stage,
+ dataService.getApiServiceDescriptor(),
+ stateService.getApiServiceDescriptor());
} catch (IOException e) {
throw new RuntimeException(e);
}