You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/25 23:43:03 UTC

[beam] branch master updated: [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ae60a7b  [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.
     new 390fb7e  Merge pull request #13695 from [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop
ae60a7b is described below

commit ae60a7bd89a9917920b9bd07bce798848e5f6fa4
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Jan 6 10:02:56 2021 -0800

    [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.
---
 .../harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java   | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 89b0a7c..3a7b741 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -36,6 +36,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builde
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
@@ -251,11 +252,14 @@ public class FnHarness {
 
       JvmInitializers.runBeforeProcessing(options);
 
+      ExecutionStateSampler.instance().start();
+
       LOG.info("Entering instruction processing loop");
       control.processInstructionRequests(executorService);
       processBundleHandler.shutdown();
     } finally {
       System.out.println("Shutting SDK harness down.");
+      ExecutionStateSampler.instance().stop();
       executorService.shutdown();
     }
   }