You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/07/31 00:47:25 UTC
[beam] branch master updated: [BEAM-5023] Send HarnessId in
DataClient (#6066)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c0b64fb [BEAM-5023] Send HarnessId in DataClient (#6066)
c0b64fb is described below
commit c0b64fb09414e36c67480a8e26a6e0b392e9c6c7
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Mon Jul 30 17:47:21 2018 -0700
[BEAM-5023] Send HarnessId in DataClient (#6066)
* Send HarnessId in DataClient
The data client needs to contain HarnessId to register itself
* Adding harness id to the channelFactory
---
.../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 4 ++++
.../org/apache/beam/fn/harness/control/BeamFnControlClient.java | 7 +------
2 files changed, 5 insertions(+), 6 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index ec3821d..6ba55bc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -19,8 +19,10 @@
package org.apache.beam.fn.harness;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import java.util.EnumMap;
import java.util.List;
+import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.RegisterHandler;
@@ -116,6 +118,8 @@ public class FnHarness {
}
OutboundObserverFactory outboundObserverFactory =
HarnessStreamObserverFactories.fromOptions(options);
+ channelFactory =
+ channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
main(
id,
options,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index 09d18b3..f1145b8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -20,7 +20,6 @@ package org.apache.beam.fn.harness.control;
import static com.google.common.base.Throwables.getStackTraceAsString;
-import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.EnumMap;
import java.util.Objects;
@@ -79,11 +78,7 @@ public class BeamFnControlClient {
this.bufferedInstructions = new LinkedBlockingDeque<>();
this.outboundObserver =
outboundObserverFactory.outboundObserverFor(
- BeamFnControlGrpc.newStub(
- channelFactory
- .withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)))
- .forDescriptor(apiServiceDescriptor))
- ::control,
+ BeamFnControlGrpc.newStub(channelFactory.forDescriptor(apiServiceDescriptor))::control,
new InboundObserver());
this.handlers = handlers;
this.onFinish = new CompletableFuture<>();