You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2019/03/12 18:56:22 UTC

[beam] branch master updated: [BEAM-6504] Connect SideInputHandler in dataflow runner (Part 2) (#7926)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8502b36  [BEAM-6504] Connect SideInputHandler in dataflow runner (Part 2) (#7926)
8502b36 is described below

commit 8502b368e2d3806499091d5bc4d7066f12729843
Author: Ruoyun Huang <hu...@gmail.com>
AuthorDate: Tue Mar 12 11:55:53 2019 -0700

    [BEAM-6504] Connect SideInputHandler in dataflow runner (Part 2) (#7926)
    
    * Connect SideInputHandler in dataflow runner.
    
    * Remove unused LOG info
    
    * Remove redundant `include` line
---
 runners/google-cloud-dataflow-java/build.gradle    |  4 -
 .../worker/BeamFnMapTaskExecutorFactory.java       | 85 +++++++++++++++++++++-
 .../fn/control/ProcessRemoteBundleOperation.java   | 40 +++++++++-
 .../graph/CreateExecutableStageNodeFunction.java   | 65 ++++++++++++++++-
 .../beam/runners/dataflow/worker/graph/Nodes.java  | 13 +++-
 .../SingleEnvironmentInstanceJobBundleFactory.java |  5 +-
 6 files changed, 196 insertions(+), 16 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index edba36d..fa99117 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -264,8 +264,6 @@ task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
     maxParallelForks Integer.MAX_VALUE
     classpath = configurations.validatesRunner
     testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
-    // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution.
-    exclude '**/ViewTest.class'
     useJUnit {
         includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
         commonExcludeCategories.each {
@@ -274,8 +272,6 @@ task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) {
         fnApiWorkerExcludeCategories.each {
             excludeCategories it
         }
-        // TODO(BEAM-6232): Support sideinput.
-        excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs'
         // TODO(BEAM-6233): Support timer and state.
         excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo'
     }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 8692e6a..f54c207 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.function.Function;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -347,16 +348,42 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
                         outputReceiverNode.getPcollectionId(),
                         outputReceiverNode.getOutputReceiver()));
 
-        DataflowOperationContext operationContext =
+        DataflowOperationContext operationContextStage =
             executionContext.createOperationContext(
                 NameContext.create(stageName, stageName, stageName, stageName));
-
         TimerReceiver timerReceiver =
             new TimerReceiver(
                 input.getExecutableStage().getComponents(),
-                executionContext.getStepContext(operationContext).namespacedToUser(),
+                executionContext.getStepContext(operationContextStage).namespacedToUser(),
                 stageBundleFactory);
 
+        ImmutableMap.Builder<String, DataflowOperationContext>
+            ptransformIdToOperationContextBuilder = ImmutableMap.builder();
+
+        for (Map.Entry<String, NameContext> entry :
+            input.getPTransformIdToPartialNameContextMap().entrySet()) {
+          NameContext fullNameContext =
+              NameContext.create(
+                  stageName,
+                  entry.getValue().originalName(),
+                  entry.getValue().systemName(),
+                  entry.getValue().userName());
+
+          DataflowOperationContext operationContext =
+              executionContext.createOperationContext(fullNameContext);
+          ptransformIdToOperationContextBuilder.put(entry.getKey(), operationContext);
+        }
+
+        ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts =
+            ptransformIdToOperationContextBuilder.build();
+
+        ImmutableMap<String, SideInputReader> ptransformIdToSideInputReaders =
+            buildPTransformIdToSideInputReadersMap(
+                executionContext, input, ptransformIdToOperationContexts);
+
+        Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+            ptransformIdToSideInputIdToPCollectionView = buildSideInputIdToPCollectionView(input);
+
         return OperationNode.create(
             new ProcessRemoteBundleOperation(
                 input.getExecutableStage(),
@@ -364,7 +391,9 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
                     NameContext.create(stageName, stageName, stageName, stageName)),
                 stageBundleFactory,
                 outputReceiverMap,
-                timerReceiver));
+                timerReceiver,
+                ptransformIdToSideInputReaders,
+                ptransformIdToSideInputIdToPCollectionView));
       }
     };
   }
@@ -453,6 +482,33 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
     return ptransformIdToSideInputReaders.build();
   }
 
+  /** Returns a map from PTransform id to side input reader. */
+  private static ImmutableMap<String, SideInputReader> buildPTransformIdToSideInputReadersMap(
+      DataflowExecutionContext executionContext,
+      ExecutableStageNode registerRequestNode,
+      ImmutableMap<String, DataflowOperationContext> ptransformIdToOperationContexts) {
+
+    ImmutableMap.Builder<String, SideInputReader> ptransformIdToSideInputReaders =
+        ImmutableMap.builder();
+    for (Map.Entry<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionView :
+        registerRequestNode.getPTransformIdToPCollectionViewMap().entrySet()) {
+      try {
+        ptransformIdToSideInputReaders.put(
+            ptransformIdToPCollectionView.getKey(),
+            executionContext.getSideInputReader(
+                // Note that the side input infos will only be populated for a batch pipeline
+                registerRequestNode
+                    .getPTransformIdToSideInputInfoMap()
+                    .get(ptransformIdToPCollectionView.getKey()),
+                ptransformIdToPCollectionView.getValue(),
+                ptransformIdToOperationContexts.get(ptransformIdToPCollectionView.getKey())));
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    return ptransformIdToSideInputReaders.build();
+  }
+
   /**
    * Returns a table where the row key is the PTransform id, the column key is the side input id,
    * and the value is the corresponding PCollectionView.
@@ -474,6 +530,27 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
     return ptransformIdToSideInputIdToPCollectionViewBuilder.build();
   }
 
+  /** Returns a map where key is the SideInput id, value is PCollectionView. */
+  private static Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+      buildSideInputIdToPCollectionView(ExecutableStageNode executableStageNode) {
+    ImmutableMap.Builder<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+        sideInputIdToPCollectionViewMapBuilder = ImmutableMap.builder();
+
+    for (Map.Entry<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews :
+        executableStageNode.getPTransformIdToPCollectionViewMap().entrySet()) {
+      for (PCollectionView<?> pCollectionView : ptransformIdToPCollectionViews.getValue()) {
+        sideInputIdToPCollectionViewMapBuilder.put(
+            RunnerApi.ExecutableStagePayload.SideInputId.newBuilder()
+                .setTransformId(ptransformIdToPCollectionViews.getKey())
+                .setLocalName(pCollectionView.getTagInternal().getId())
+                .build(),
+            pCollectionView);
+      }
+    }
+
+    return sideInputIdToPCollectionViewMapBuilder.build();
+  }
+
   /**
    * Creates an {@link Operation} from the given {@link ParallelInstruction} definition using the
    * provided {@link ReaderFactory}.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index a0bcd6f..f9ff270 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -18,16 +18,23 @@
 package org.apache.beam.runners.dataflow.worker.fn.control;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.*;
+import java.util.EnumMap;
 import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
 import org.apache.beam.runners.fnexecution.control.*;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,15 +68,44 @@ public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
       DataflowOperationContext operationContext,
       StageBundleFactory stageBundleFactory,
       Map<String, OutputReceiver> outputReceiverMap,
-      TimerReceiver timerReceiver) {
+      TimerReceiver timerReceiver,
+      Map<String, SideInputReader> ptransformIdToSideInputReader,
+      Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>
+          sideInputIdToPCollectionViewMap) {
     super(EMPTY_RECEIVER_ARRAY, operationContext);
 
     this.timerReceiver = timerReceiver;
     this.stageBundleFactory = stageBundleFactory;
-    this.stateRequestHandler = StateRequestHandler.unsupported();
     this.progressHandler = BundleProgressHandler.ignored();
     this.executableStage = executableStage;
     this.outputReceiverMap = outputReceiverMap;
+
+    StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
+        DataflowSideInputHandlerFactory.of(
+            ptransformIdToSideInputReader, sideInputIdToPCollectionViewMap);
+
+    stateRequestHandler = getStateRequestHandler(executableStage, sideInputHandlerFactory);
+  }
+
+  private StateRequestHandler getStateRequestHandler(
+      ExecutableStage executableStage,
+      StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory) {
+    final StateRequestHandler sideInputHandler;
+
+    try {
+      sideInputHandler =
+          StateRequestHandlers.forSideInputHandlerFactory(
+              ProcessBundleDescriptors.getSideInputs(executableStage), sideInputHandlerFactory);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to setup state handler", e);
+    }
+
+    EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap =
+        new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(
+            BeamFnApi.StateKey.TypeCase.class);
+    handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputHandler);
+
+    return StateRequestHandlers.delegateBasedUponType(handlerMap);
   }
 
   @Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index a2cf3ab..a96ceac 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker.graph;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getBytes;
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static org.apache.beam.runners.dataflow.worker.graph.LengthPrefixUnknownCoders.forSideInputInfos;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -28,9 +29,11 @@ import com.google.api.services.dataflow.model.MultiOutputInfo;
 import com.google.api.services.dataflow.model.ParDoInstruction;
 import com.google.api.services.dataflow.model.ParallelInstruction;
 import com.google.api.services.dataflow.model.ReadInstruction;
+import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,9 +76,11 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.graph.MutableNetwork;
@@ -189,6 +194,12 @@ public class CreateExecutableStageNodeFunction
 
     Map<Node, String> nodesToPCollections = new HashMap<>();
     ImmutableMap.Builder<String, NameContext> ptransformIdToNameContexts = ImmutableMap.builder();
+
+    ImmutableMap.Builder<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfos =
+        ImmutableMap.builder();
+    ImmutableMap.Builder<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViews =
+        ImmutableMap.builder();
+
     // A field of ExecutableStage which includes the PCollection goes to worker side.
     Set<PCollectionNode> executableStageOutputs = new HashSet<>();
     // A field of ExecutableStage which includes the PCollection goes to runner side.
@@ -279,12 +290,15 @@ public class CreateExecutableStageNodeFunction
     }
 
     componentsBuilder.putAllCoders(sdkComponents.toComponents().getCodersMap());
+
     Set<PTransformNode> executableStageTransforms = new HashSet<>();
     Set<TimerReference> executableStageTimers = new HashSet<>();
     List<UserStateId> userStateIds = new ArrayList<>();
+    Set<SideInputReference> executableStageSideInputs = new HashSet<>();
 
     for (ParallelInstructionNode node :
         Iterables.filter(input.nodes(), ParallelInstructionNode.class)) {
+      ImmutableMap.Builder<String, PCollectionNode> sideInputIds = ImmutableMap.builder();
       ParallelInstruction parallelInstruction = node.getParallelInstruction();
       String ptransformId = "generatedPtransform" + idGenerator.getId();
       ptransformIdToNameContexts.put(
@@ -306,6 +320,7 @@ public class CreateExecutableStageNodeFunction
 
         if (userFnClassName.equals("CombineValuesFn") || userFnClassName.equals("KeyedCombineFn")) {
           transformSpec = transformCombineValuesFnToFunctionSpec(userFnSpec);
+          ptransformIdToPCollectionViews.put(ptransformId, Collections.emptyList());
         } else {
           String parDoPTransformId = getString(userFnSpec, PropertyNames.SERIALIZED_FN);
 
@@ -347,6 +362,33 @@ public class CreateExecutableStageNodeFunction
               userStateIds.add(builder.build());
             }
 
+            // To facilitate the creation of Set executableStageSideInputs.
+            for (String sideInputTag : parDoPayload.getSideInputsMap().keySet()) {
+              String sideInputPCollectionId = parDoPTransform.getInputsOrThrow(sideInputTag);
+              RunnerApi.PCollection sideInputPCollection =
+                  pipeline.getComponents().getPcollectionsOrThrow(sideInputPCollectionId);
+
+              pTransform.putInputs(sideInputTag, sideInputPCollectionId);
+
+              PCollectionNode pCollectionNode =
+                  PipelineNode.pCollection(sideInputPCollectionId, sideInputPCollection);
+              sideInputIds.put(sideInputTag, pCollectionNode);
+            }
+
+            // To facilitate the creation of Map(ptransformId -> pCollectionView), which is
+            // required by constructing an ExecutableStageNode.
+            ImmutableList.Builder<PCollectionView<?>> pcollectionViews = ImmutableList.builder();
+            for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
+                parDoPayload.getSideInputsMap().entrySet()) {
+              pcollectionViews.add(
+                  RegisterNodeFunction.transformSideInputForRunner(
+                      pipeline,
+                      parDoPTransform,
+                      sideInputEntry.getKey(),
+                      sideInputEntry.getValue()));
+            }
+            ptransformIdToPCollectionViews.put(ptransformId, pcollectionViews.build());
+
             transformSpec
                 .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
                 .setPayload(parDoPayload.toByteString());
@@ -358,6 +400,11 @@ public class CreateExecutableStageNodeFunction
                 .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
                 .setPayload(ByteString.copyFrom(userFnBytes));
           }
+
+          if (parDoInstruction.getSideInputs() != null) {
+            ptransformIdToSideInputInfos.put(
+                ptransformId, forSideInputInfos(parDoInstruction.getSideInputs(), true));
+          }
         }
       } else if (parallelInstruction.getRead() != null) {
         ReadInstruction readInstruction = parallelInstruction.getRead();
@@ -403,6 +450,16 @@ public class CreateExecutableStageNodeFunction
       for (String timerId : timerIds) {
         executableStageTimers.add(TimerReference.of(pTransformNode, timerId));
       }
+
+      ImmutableMap<String, PCollectionNode> sideInputIdToPCollectionNodes = sideInputIds.build();
+      for (String sideInputTag : sideInputIdToPCollectionNodes.keySet()) {
+        SideInputReference sideInputReference =
+            SideInputReference.of(
+                pTransformNode, sideInputTag, sideInputIdToPCollectionNodes.get(sideInputTag));
+        executableStageSideInputs.add(sideInputReference);
+      }
+
+      executableStageTransforms.add(pTransformNode);
     }
 
     if (executableStageInputs.size() != 1) {
@@ -419,9 +476,7 @@ public class CreateExecutableStageNodeFunction
       executableStageEnv = Environments.JAVA_SDK_HARNESS_ENVIRONMENT;
     }
 
-    Set<SideInputReference> executableStageSideInputs = new HashSet<>();
     Set<UserStateReference> executableStageUserStateReference = new HashSet<>();
-
     for (UserStateId userStateId : userStateIds) {
       executableStageUserStateReference.add(
           UserStateReference.fromUserStateId(userStateId, executableStageComponents));
@@ -437,7 +492,11 @@ public class CreateExecutableStageNodeFunction
             executableStageTimers,
             executableStageTransforms,
             executableStageOutputs);
-    return ExecutableStageNode.create(executableStage, ptransformIdToNameContexts.build());
+    return ExecutableStageNode.create(
+        executableStage,
+        ptransformIdToNameContexts.build(),
+        ptransformIdToSideInputInfos.build(),
+        ptransformIdToPCollectionViews.build());
   }
 
   private Environment getEnvironmentFromPTransform(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index e4f45dc..7a2d873 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -319,17 +319,26 @@ public class Nodes {
   public abstract static class ExecutableStageNode extends Node {
     public static ExecutableStageNode create(
         ExecutableStage executableStage,
-        Map<String, NameContext> ptransformIdToPartialNameContextMap) {
+        Map<String, NameContext> ptransformIdToPartialNameContextMap,
+        Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
+        Map<String, Iterable<PCollectionView<?>>> pTransformIdToPCollectionViewMap) {
       checkNotNull(executableStage);
       checkNotNull(ptransformIdToPartialNameContextMap);
       return new AutoValue_Nodes_ExecutableStageNode(
-          executableStage, ptransformIdToPartialNameContextMap);
+          executableStage,
+          ptransformIdToPartialNameContextMap,
+          ptransformIdToSideInputInfoMap,
+          pTransformIdToPCollectionViewMap);
     }
 
     public abstract ExecutableStage getExecutableStage();
 
     public abstract Map<String, NameContext> getPTransformIdToPartialNameContextMap();
 
+    public abstract Map<String, Iterable<SideInputInfo>> getPTransformIdToSideInputInfoMap();
+
+    public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();
+
     @Override
     public String toString() {
       // The request may be very large.
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 46dc8c3..38fcc93 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -104,7 +104,10 @@ public class SingleEnvironmentInstanceJobBundleFactory implements JobBundleFacto
     try {
       descriptor =
           ProcessBundleDescriptors.fromExecutableStage(
-              idGenerator.getId(), stage, dataService.getApiServiceDescriptor());
+              idGenerator.getId(),
+              stage,
+              dataService.getApiServiceDescriptor(),
+              stateService.getApiServiceDescriptor());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }