You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/08/16 21:48:59 UTC
[1/2] beam git commit: [BEAM-1347] Plumb through a yet to be created
state client through PTransformRunnerFactory
Repository: beam
Updated Branches:
refs/heads/master d0deb6cc8 -> 8503adbbc
[BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cfb79883
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cfb79883
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cfb79883
Branch: refs/heads/master
Commit: cfb798830042b28eaf343103724779c90092535c
Parents: d0deb6c
Author: Luke Cwik <lc...@google.com>
Authored: Fri Jul 7 14:02:58 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 16 14:48:07 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/BeamFnDataReadRunner.java | 2 ++
.../beam/fn/harness/BeamFnDataWriteRunner.java | 2 ++
.../beam/fn/harness/BoundedSourceRunner.java | 2 ++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 8 +++++++
.../org/apache/beam/fn/harness/FnHarness.java | 7 ++++--
.../fn/harness/PTransformRunnerFactory.java | 5 +++-
.../harness/control/ProcessBundleHandler.java | 11 +++++++--
.../fn/harness/state/BeamFnStateClient.java | 25 ++++++++++++++++++++
.../beam/fn/harness/state/package-info.java | 22 +++++++++++++++++
.../fn/harness/BeamFnDataReadRunnerTest.java | 1 +
.../fn/harness/BeamFnDataWriteRunnerTest.java | 1 +
.../fn/harness/BoundedSourceRunnerTest.java | 1 +
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 1 +
.../control/ProcessBundleHandlerTest.java | 9 +++++++
14 files changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index df0e5a2..f254ec4 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -33,6 +33,7 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
@@ -77,6 +78,7 @@ public class BeamFnDataReadRunner<OutputT> {
public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index 48b450a..179a228 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
@@ -72,6 +73,7 @@ public class BeamFnDataWriteRunner<InputT> {
public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
index 5f6509f..c4daa0f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java
@@ -31,6 +31,7 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source.Reader;
@@ -64,6 +65,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT>
public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 86168f9..d325bb2 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -48,6 +49,8 @@ import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -86,6 +89,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
public DoFnRunner<InputT, OutputT> createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
@@ -165,6 +169,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
private final StartBundleContext startBundleContext;
private final ProcessBundleContext processBundleContext;
private final FinishBundleContext finishBundleContext;
+ private final WindowingStrategy windowingStrategy;
+ private final DoFnSignature doFnSignature;
/**
* The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
@@ -186,6 +192,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp
this.doFn = doFn;
this.mainOutputConsumers = mainOutputConsumers;
this.outputMap = outputMap;
+ this.windowingStrategy = windowingStrategy;
+ this.doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
this.doFnInvoker = DoFnInvokers.invokerFor(doFn);
this.startBundleContext = new StartBundleContext();
this.processBundleContext = new ProcessBundleContext();
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 05ab44f..a79ecca 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -109,8 +109,11 @@ public class FnHarness {
BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
options, channelFactory::forDescriptor, streamObserverFactory::from);
- ProcessBundleHandler processBundleHandler =
- new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer);
+ ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(
+ options,
+ fnApiRegistry::getById,
+ beamFnDataMultiplexer,
+ null /* beamFnStateClient */);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
fnApiRegistry::register);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
index 7cf0610..4ef56d8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
@@ -40,7 +41,8 @@ public interface PTransformRunnerFactory<T> {
* element processing, or during execution of start/finish.
*
* @param pipelineOptions Pipeline options
- * @param beamFnDataClient
+ * @param beamFnDataClient A client for handling inbound and outbound data streams.
+ * @param beamFnStateClient A client for handling state requests.
* @param pTransformId The id of the PTransform.
* @param pTransform The PTransform definition.
* @param processBundleInstructionId A supplier containing the active process bundle instruction
@@ -58,6 +60,7 @@ public interface PTransformRunnerFactory<T> {
T createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 1e73570..67c4d67 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -39,6 +39,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -83,6 +84,7 @@ public class ProcessBundleHandler {
private final PipelineOptions options;
private final Function<String, Message> fnApiRegistry;
private final BeamFnDataClient beamFnDataClient;
+ private final BeamFnStateClient beamFnStateClient;
private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
private final PTransformRunnerFactory defaultPTransformRunnerFactory;
@@ -90,8 +92,9 @@ public class ProcessBundleHandler {
public ProcessBundleHandler(
PipelineOptions options,
Function<String, Message> fnApiRegistry,
- BeamFnDataClient beamFnDataClient) {
- this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES);
+ BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient) {
+ this(options, fnApiRegistry, beamFnDataClient, beamFnStateClient, REGISTERED_RUNNER_FACTORIES);
}
@VisibleForTesting
@@ -99,16 +102,19 @@ public class ProcessBundleHandler {
PipelineOptions options,
Function<String, Message> fnApiRegistry,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
this.options = options;
this.fnApiRegistry = fnApiRegistry;
this.beamFnDataClient = beamFnDataClient;
+ this.beamFnStateClient = beamFnStateClient;
this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() {
@Override
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beanFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
@@ -162,6 +168,7 @@ public class ProcessBundleHandler {
.createRunnerForPTransform(
options,
beamFnDataClient,
+ beamFnStateClient,
pTransformId,
pTransform,
processBundleInstructionId,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
new file mode 100644
index 0000000..8150530
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+/**
+ * TODO: Define interface required for handling state calls.
+ */
+public interface BeamFnStateClient {
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
new file mode 100644
index 0000000..feadb7d
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State client and state caching.
+ */
+package org.apache.beam.fn.harness.state;
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index 92e6088..e5b4968 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -141,6 +141,7 @@ public class BeamFnDataReadRunnerTest {
new BeamFnDataReadRunner.Factory<String>().createRunnerForPTransform(
PipelineOptionsFactory.create(),
mockBeamFnDataClient,
+ null /* beamFnStateClient */,
"pTransformId",
pTransform,
Suppliers.ofInstance(bundleId)::get,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index ffa3a2d..c4b717a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -127,6 +127,7 @@ public class BeamFnDataWriteRunnerTest {
new BeamFnDataWriteRunner.Factory<String>().createRunnerForPTransform(
PipelineOptionsFactory.create(),
mockBeamFnDataClient,
+ null /* beamFnStateClient */,
"ptransformId",
pTransform,
Suppliers.ofInstance(bundleId)::get,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
index b9f22e8..135495a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java
@@ -140,6 +140,7 @@ public class BoundedSourceRunnerTest {
new BoundedSourceRunner.Factory<>().createRunnerForPTransform(
PipelineOptionsFactory.create(),
null /* beamFnDataClient */,
+ null /* beamFnStateClient */,
"pTransformId",
pTransform,
Suppliers.ofInstance("57L")::get,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index efa8fcf..ebec608 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -126,6 +126,7 @@ public class FnApiDoFnRunnerTest {
new FnApiDoFnRunner.Factory<>().createRunnerForPTransform(
PipelineOptionsFactory.create(),
null /* beamFnDataClient */,
+ null /* beamFnStateClient */,
pTransformId,
pTransform,
Suppliers.ofInstance("57L")::get,
http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 0a94b5b..d0e1faf 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
+import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,6 +92,7 @@ public class ProcessBundleHandlerTest {
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
@@ -115,6 +117,7 @@ public class ProcessBundleHandlerTest {
PipelineOptionsFactory.create(),
fnApiRegistry::get,
beamFnDataClient,
+ null /* beamFnStateClient */,
ImmutableMap.of(
DATA_INPUT_URN, startFinishRecorder,
DATA_OUTPUT_URN, startFinishRecorder));
@@ -147,11 +150,13 @@ public class ProcessBundleHandlerTest {
PipelineOptionsFactory.create(),
fnApiRegistry::get,
beamFnDataClient,
+ null /* beamFnStateClient */,
ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
@Override
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
@@ -185,11 +190,13 @@ public class ProcessBundleHandlerTest {
PipelineOptionsFactory.create(),
fnApiRegistry::get,
beamFnDataClient,
+ null /* beamFnStateClient */,
ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
@Override
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
@@ -224,11 +231,13 @@ public class ProcessBundleHandlerTest {
PipelineOptionsFactory.create(),
fnApiRegistry::get,
beamFnDataClient,
+ null /* beamFnStateClient */,
ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() {
@Override
public Object createRunnerForPTransform(
PipelineOptions pipelineOptions,
BeamFnDataClient beamFnDataClient,
+ BeamFnStateClient beamFnStateClient,
String pTransformId,
RunnerApi.PTransform pTransform,
Supplier<String> processBundleInstructionId,
[2/2] beam git commit: [BEAM-1347] Plumb through a yet to be created
state client through PTransformRunnerFactory
Posted by lc...@apache.org.
[BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory
This closes #3723
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8503adbb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8503adbb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8503adbb
Branch: refs/heads/master
Commit: 8503adbbc3a590cd0dc2939f6a45d335682a9442
Parents: d0deb6c cfb7988
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 16 14:48:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 16 14:48:45 2017 -0700
----------------------------------------------------------------------
.../beam/fn/harness/BeamFnDataReadRunner.java | 2 ++
.../beam/fn/harness/BeamFnDataWriteRunner.java | 2 ++
.../beam/fn/harness/BoundedSourceRunner.java | 2 ++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 8 +++++++
.../org/apache/beam/fn/harness/FnHarness.java | 7 ++++--
.../fn/harness/PTransformRunnerFactory.java | 5 +++-
.../harness/control/ProcessBundleHandler.java | 11 +++++++--
.../fn/harness/state/BeamFnStateClient.java | 25 ++++++++++++++++++++
.../beam/fn/harness/state/package-info.java | 22 +++++++++++++++++
.../fn/harness/BeamFnDataReadRunnerTest.java | 1 +
.../fn/harness/BeamFnDataWriteRunnerTest.java | 1 +
.../fn/harness/BoundedSourceRunnerTest.java | 1 +
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 1 +
.../control/ProcessBundleHandlerTest.java | 9 +++++++
14 files changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------