You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2021/01/25 23:43:03 UTC
[beam] branch master updated: [BEAM-11581] Start
ExecutionStateSampler in Java SDK harness before dispatch loop.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ae60a7b [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.
new 390fb7e Merge pull request #13695 from [BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop
ae60a7b is described below
commit ae60a7bd89a9917920b9bd07bce798848e5f6fa4
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Jan 6 10:02:56 2021 -0800
[BEAM-11581] Start ExecutionStateSampler in Java SDK harness before dispatch loop.
---
.../harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 89b0a7c..3a7b741 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -36,6 +36,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builde
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
@@ -251,11 +252,14 @@ public class FnHarness {
JvmInitializers.runBeforeProcessing(options);
+ ExecutionStateSampler.instance().start();
+
LOG.info("Entering instruction processing loop");
control.processInstructionRequests(executorService);
processBundleHandler.shutdown();
} finally {
System.out.println("Shutting SDK harness down.");
+ ExecutionStateSampler.instance().stop();
executorService.shutdown();
}
}