You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/16 21:48:59 UTC

[1/2] beam git commit: [BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory

Repository: beam
Updated Branches:
  refs/heads/master d0deb6cc8 -> 8503adbbc


[BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cfb79883
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cfb79883
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cfb79883

Branch: refs/heads/master
Commit: cfb798830042b28eaf343103724779c90092535c
Parents: d0deb6c
Author: Luke Cwik <lc...@google.com>
Authored: Fri Jul 7 14:02:58 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 16 14:48:07 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/BeamFnDataReadRunner.java   |  2 ++
 .../beam/fn/harness/BeamFnDataWriteRunner.java  |  2 ++
 .../beam/fn/harness/BoundedSourceRunner.java    |  2 ++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java |  8 +++++++
 .../org/apache/beam/fn/harness/FnHarness.java   |  7 ++++--
 .../fn/harness/PTransformRunnerFactory.java     |  5 +++-
 .../harness/control/ProcessBundleHandler.java   | 11 +++++++--
 .../fn/harness/state/BeamFnStateClient.java     | 25 ++++++++++++++++++++
 .../beam/fn/harness/state/package-info.java     | 22 +++++++++++++++++
 .../fn/harness/BeamFnDataReadRunnerTest.java    |  1 +
 .../fn/harness/BeamFnDataWriteRunnerTest.java   |  1 +
 .../fn/harness/BoundedSourceRunnerTest.java     |  1 +
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    |  1 +
 .../control/ProcessBundleHandlerTest.java       |  9 +++++++
 14 files changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index df0e5a2..f254ec4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -33,6 +33,7 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
@@ -77,6 +78,7 @@ public class BeamFnDataReadRunner<OutputT> {
     public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
         PipelineOptions pipelineOptions,
         BeamFnDataClient beamFnDataClient,
+        BeamFnStateClient beamFnStateClient,
         String pTransformId,
         RunnerApi.PTransform pTransform,
         Supplier<String> processBundleInstructionId,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 48b450a..179a228 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
@@ -72,6 +73,7 @@ public class BeamFnDataWriteRunner<InputT> {
     public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
         PipelineOptions pipelineOptions,
         BeamFnDataClient beamFnDataClient,
+        BeamFnStateClient beamFnStateClient,
         String pTransformId,
         RunnerApi.PTransform pTransform,
         Supplier<String> processBundleInstructionId,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 5f6509f..c4daa0f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -31,6 +31,7 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source.Reader;
@@ -64,6 +65,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
     public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
         PipelineOptions pipelineOptions,
         BeamFnDataClient beamFnDataClient,
+        BeamFnStateClient beamFnStateClient,
         String pTransformId,
         RunnerApi.PTransform pTransform,
         Supplier<String> processBundleInstructionId,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 86168f9..d325bb2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -48,6 +49,8 @@ import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -86,6 +89,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
     public DoFnRunner<InputT, OutputT> createRunnerForPTransform(
         PipelineOptions pipelineOptions,
         BeamFnDataClient beamFnDataClient,
+        BeamFnStateClient beamFnStateClient,
         String pTransformId,
         RunnerApi.PTransform pTransform,
         Supplier<String> processBundleInstructionId,
@@ -165,6 +169,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
   private final StartBundleContext startBundleContext;
   private final ProcessBundleContext processBundleContext;
   private final FinishBundleContext finishBundleContext;
+  private final WindowingStrategy windowingStrategy;
+  private final DoFnSignature doFnSignature;
 
   /**
    * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
@@ -186,6 +192,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
     this.doFn = doFn;
     this.mainOutputConsumers = mainOutputConsumers;
     this.outputMap = outputMap;
+    this.windowingStrategy = windowingStrategy;
+    this.doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
     this.doFnInvoker = DoFnInvokers.invokerFor(doFn);
     this.startBundleContext = new StartBundleContext();
     this.processBundleContext = new ProcessBundleContext();

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 05ab44f..a79ecca 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -109,8 +109,11 @@ public class FnHarness {
       BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
           options, channelFactory::forDescriptor, streamObserverFactory::from);
 
-      ProcessBundleHandler processBundleHandler =
-          new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer);
+      ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(
+          options,
+          fnApiRegistry::getById,
+          beamFnDataMultiplexer,
+          null /* beamFnStateClient */);
       handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
           fnApiRegistry::register);
       handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index 7cf0610..4ef56d8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -40,7 +41,8 @@ public interface PTransformRunnerFactory<T> {
    * element processing, or during execution of start/finish.
    *
    * @param pipelineOptions Pipeline options
-   * @param beamFnDataClient
+   * @param beamFnDataClient A client for handling inbound and outbound data streams.
+   * @param beamFnStateClient A client for handling state requests.
    * @param pTransformId The id of the PTransform.
    * @param pTransform The PTransform definition.
    * @param processBundleInstructionId A supplier containing the active process bundle instruction
@@ -58,6 +60,7 @@ public interface PTransformRunnerFactory<T> {
   T createRunnerForPTransform(
       PipelineOptions pipelineOptions,
       BeamFnDataClient beamFnDataClient,
+      BeamFnStateClient beamFnStateClient,
       String pTransformId,
       RunnerApi.PTransform pTransform,
       Supplier<String> processBundleInstructionId,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 1e73570..67c4d67 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -39,6 +39,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -83,6 +84,7 @@ public class ProcessBundleHandler {
   private final PipelineOptions options;
   private final Function<String, Message> fnApiRegistry;
   private final BeamFnDataClient beamFnDataClient;
+  private final BeamFnStateClient beamFnStateClient;
   private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
   private final PTransformRunnerFactory defaultPTransformRunnerFactory;
 
@@ -90,8 +92,9 @@ public class ProcessBundleHandler {
   public ProcessBundleHandler(
       PipelineOptions options,
       Function<String, Message> fnApiRegistry,
-      BeamFnDataClient beamFnDataClient) {
-    this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES);
+      BeamFnDataClient beamFnDataClient,
+      BeamFnStateClient beamFnStateClient) {
+    this(options, fnApiRegistry, beamFnDataClient, beamFnStateClient, REGISTERED_RUNNER_FACTORIES);
   }
 
   @VisibleForTesting
@@ -99,16 +102,19 @@ public class ProcessBundleHandler {
       PipelineOptions options,
       Function<String, Message> fnApiRegistry,
       BeamFnDataClient beamFnDataClient,
+      BeamFnStateClient beamFnStateClient,
       Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
     this.options = options;
     this.fnApiRegistry = fnApiRegistry;
     this.beamFnDataClient = beamFnDataClient;
+    this.beamFnStateClient = beamFnStateClient;
     this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
     this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() {
       @Override
       public Object createRunnerForPTransform(
           PipelineOptions pipelineOptions,
           BeamFnDataClient beamFnDataClient,
+          BeamFnStateClient beanFnStateClient,
           String pTransformId,
           RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
@@ -162,6 +168,7 @@ public class ProcessBundleHandler {
         .createRunnerForPTransform(
             options,
             beamFnDataClient,
+            beamFnStateClient,
             pTransformId,
             pTransform,
             processBundleInstructionId,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
new file mode 100644
index 0000000..8150530
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+/**
+ * TODO: Define interface required for handling state calls.
+ */
+public interface BeamFnStateClient {
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
new file mode 100644
index 0000000..feadb7d
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State client and state caching.
+ */
+package org.apache.beam.fn.harness.state;

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 92e6088..e5b4968 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -141,6 +141,7 @@ public class BeamFnDataReadRunnerTest {
     new BeamFnDataReadRunner.Factory<String>().createRunnerForPTransform(
         PipelineOptionsFactory.create(),
         mockBeamFnDataClient,
+        null /* beamFnStateClient */,
         "pTransformId",
         pTransform,
         Suppliers.ofInstance(bundleId)::get,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index ffa3a2d..c4b717a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -127,6 +127,7 @@ public class BeamFnDataWriteRunnerTest {
     new BeamFnDataWriteRunner.Factory<String>().createRunnerForPTransform(
         PipelineOptionsFactory.create(),
         mockBeamFnDataClient,
+        null /* beamFnStateClient */,
         "ptransformId",
         pTransform,
         Suppliers.ofInstance(bundleId)::get,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index b9f22e8..135495a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -140,6 +140,7 @@ public class BoundedSourceRunnerTest {
     new BoundedSourceRunner.Factory<>().createRunnerForPTransform(
         PipelineOptionsFactory.create(),
         null /* beamFnDataClient */,
+        null /* beamFnStateClient */,
         "pTransformId",
         pTransform,
         Suppliers.ofInstance("57L")::get,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index efa8fcf..ebec608 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -126,6 +126,7 @@ public class FnApiDoFnRunnerTest {
     new FnApiDoFnRunner.Factory<>().createRunnerForPTransform(
         PipelineOptionsFactory.create(),
         null /* beamFnDataClient */,
+        null /* beamFnStateClient */,
         pTransformId,
         pTransform,
         Suppliers.ofInstance("57L")::get,

http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 0a94b5b..d0e1faf 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,6 +92,7 @@ public class ProcessBundleHandlerTest {
       public Object createRunnerForPTransform(
           PipelineOptions pipelineOptions,
           BeamFnDataClient beamFnDataClient,
+          BeamFnStateClient beamFnStateClient,
           String pTransformId,
           RunnerApi.PTransform pTransform,
           Supplier<String> processBundleInstructionId,
@@ -115,6 +117,7 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
+        null /* beamFnStateClient */,
         ImmutableMap.of(
             DATA_INPUT_URN, startFinishRecorder,
             DATA_OUTPUT_URN, startFinishRecorder));
@@ -147,11 +150,13 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
+        null /* beamFnStateClient */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
               PipelineOptions pipelineOptions,
               BeamFnDataClient beamFnDataClient,
+              BeamFnStateClient beamFnStateClient,
               String pTransformId,
               RunnerApi.PTransform pTransform,
               Supplier<String> processBundleInstructionId,
@@ -185,11 +190,13 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
+        null /* beamFnStateClient */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
               PipelineOptions pipelineOptions,
               BeamFnDataClient beamFnDataClient,
+              BeamFnStateClient beamFnStateClient,
               String pTransformId,
               RunnerApi.PTransform pTransform,
               Supplier<String> processBundleInstructionId,
@@ -224,11 +231,13 @@ public class ProcessBundleHandlerTest {
         PipelineOptionsFactory.create(),
         fnApiRegistry::get,
         beamFnDataClient,
+        null /* beamFnStateClient */,
         ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
           @Override
           public Object createRunnerForPTransform(
               PipelineOptions pipelineOptions,
               BeamFnDataClient beamFnDataClient,
+              BeamFnStateClient beamFnStateClient,
               String pTransformId,
               RunnerApi.PTransform pTransform,
               Supplier<String> processBundleInstructionId,


[2/2] beam git commit: [BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory

Posted by lc...@apache.org.
[BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory

This closes #3723


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8503adbb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8503adbb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8503adbb

Branch: refs/heads/master
Commit: 8503adbbc3a590cd0dc2939f6a45d335682a9442
Parents: d0deb6c cfb7988
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 16 14:48:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 16 14:48:45 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/BeamFnDataReadRunner.java   |  2 ++
 .../beam/fn/harness/BeamFnDataWriteRunner.java  |  2 ++
 .../beam/fn/harness/BoundedSourceRunner.java    |  2 ++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java |  8 +++++++
 .../org/apache/beam/fn/harness/FnHarness.java   |  7 ++++--
 .../fn/harness/PTransformRunnerFactory.java     |  5 +++-
 .../harness/control/ProcessBundleHandler.java   | 11 +++++++--
 .../fn/harness/state/BeamFnStateClient.java     | 25 ++++++++++++++++++++
 .../beam/fn/harness/state/package-info.java     | 22 +++++++++++++++++
 .../fn/harness/BeamFnDataReadRunnerTest.java    |  1 +
 .../fn/harness/BeamFnDataWriteRunnerTest.java   |  1 +
 .../fn/harness/BoundedSourceRunnerTest.java     |  1 +
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    |  1 +
 .../control/ProcessBundleHandlerTest.java       |  9 +++++++
 14 files changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------