You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/07/31 00:47:25 UTC

[beam] branch master updated: [BEAM-5023] Send HarnessId in DataClient (#6066)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b64fb  [BEAM-5023] Send HarnessId in DataClient (#6066)
c0b64fb is described below

commit c0b64fb09414e36c67480a8e26a6e0b392e9c6c7
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Mon Jul 30 17:47:21 2018 -0700

    [BEAM-5023] Send HarnessId in DataClient (#6066)
    
    * Send HarnessId in DataClient
    
    The data client needs to contain HarnessId to register itself
    
    * Adding harness id to the channelFactory
---
 .../src/main/java/org/apache/beam/fn/harness/FnHarness.java        | 4 ++++
 .../org/apache/beam/fn/harness/control/BeamFnControlClient.java    | 7 +------
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index ec3821d..6ba55bc 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -19,8 +19,10 @@
 package org.apache.beam.fn.harness;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import java.util.EnumMap;
 import java.util.List;
+import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
 import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.control.RegisterHandler;
@@ -116,6 +118,8 @@ public class FnHarness {
     }
     OutboundObserverFactory outboundObserverFactory =
         HarnessStreamObserverFactories.fromOptions(options);
+    channelFactory =
+        channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
     main(
         id,
         options,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index 09d18b3..f1145b8 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -20,7 +20,6 @@ package org.apache.beam.fn.harness.control;
 
 import static com.google.common.base.Throwables.getStackTraceAsString;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.EnumMap;
 import java.util.Objects;
@@ -79,11 +78,7 @@ public class BeamFnControlClient {
     this.bufferedInstructions = new LinkedBlockingDeque<>();
     this.outboundObserver =
         outboundObserverFactory.outboundObserverFor(
-            BeamFnControlGrpc.newStub(
-                    channelFactory
-                        .withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)))
-                        .forDescriptor(apiServiceDescriptor))
-                ::control,
+            BeamFnControlGrpc.newStub(channelFactory.forDescriptor(apiServiceDescriptor))::control,
             new InboundObserver());
     this.handlers = handlers;
     this.onFinish = new CompletableFuture<>();