You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2021/04/13 19:43:27 UTC
[beam] branch master updated: [BEAM-11945] Add debug capture to SDK
harness (#14197)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 67badf6 [BEAM-11945] Add debug capture to SDK harness (#14197)
67badf6 is described below
commit 67badf644eddacbbda1d4c2131c22dda1ee29d30
Author: kileys <ki...@google.com>
AuthorDate: Tue Apr 13 12:42:40 2021 -0700
[BEAM-11945] Add debug capture to SDK harness (#14197)
---
.../environment/EmbeddedEnvironmentFactory.java | 1 +
.../fnexecution/control/RemoteExecutionTest.java | 1 +
.../runners/portability/ExternalWorkerService.java | 3 +-
.../java/org/apache/beam/fn/harness/FnHarness.java | 32 +-
.../fn/harness/control/ProcessBundleHandler.java | 12 +-
.../beam/fn/harness/status/BeamFnStatusClient.java | 230 ++++++++
.../beam/fn/harness/status/MemoryMonitor.java | 635 +++++++++++++++++++++
.../beam/fn/harness/status/package-info.java | 20 +
.../harness/control/ProcessBundleHandlerTest.java | 2 +-
.../fn/harness/status/BeamFnStatusClientTest.java | 142 +++++
.../beam/fn/harness/status/MemoryMonitorTest.java | 166 ++++++
11 files changed, 1238 insertions(+), 6 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index 73e14f6..0b617e9 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -102,6 +102,7 @@ public class EmbeddedEnvironmentFactory implements EnvironmentFactory {
options,
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
+ null,
InProcessManagedChannelFactory.create(),
OutboundObserverFactory.clientDirect());
} catch (NoClassDefFoundError e) {
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 43e6c16..d37c41a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -214,6 +214,7 @@ public class RemoteExecutionTest implements Serializable {
PipelineOptionsFactory.create(),
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
+ null,
InProcessManagedChannelFactory.create(),
OutboundObserverFactory.clientDirect());
} catch (Exception e) {
diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
index c4311a3..bbb7760 100644
--- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
+++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
@@ -59,7 +59,8 @@ public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase impl
request.getWorkerId(),
options,
request.getLoggingEndpoint(),
- request.getControlEndpoint());
+ request.getControlEndpoint(),
+ null);
LOG.info("Successfully started worker {}.", request.getWorkerId());
} catch (Exception exn) {
LOG.error(String.format("Failed to start worker %s.", request.getWorkerId()), exn);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 3a7b741..34ab171 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import javax.annotation.Nullable;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
@@ -29,6 +30,7 @@ import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
+import org.apache.beam.fn.harness.status.BeamFnStatusClient;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -81,6 +83,7 @@ public class FnHarness {
private static final String HARNESS_ID = "HARNESS_ID";
private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
+ private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
@@ -105,6 +108,8 @@ public class FnHarness {
"Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format(
"Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
+ System.out.format(
+ "Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));
String id = environmentVarGetter.apply(HARNESS_ID);
@@ -117,7 +122,16 @@ public class FnHarness {
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
- main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
+ Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
+ environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
+ ? null
+ : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
+ main(
+ id,
+ options,
+ loggingApiServiceDescriptor,
+ controlApiServiceDescriptor,
+ statusApiServiceDescriptor);
}
/**
@@ -128,13 +142,15 @@ public class FnHarness {
* @param options The options for this pipeline
* @param loggingApiServiceDescriptor
* @param controlApiServiceDescriptor
+ * @param statusApiServiceDescriptor
* @throws Exception
*/
public static void main(
String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
- Endpoints.ApiServiceDescriptor controlApiServiceDescriptor)
+ Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
+ @Nullable Endpoints.ApiServiceDescriptor statusApiServiceDescriptor)
throws Exception {
ManagedChannelFactory channelFactory;
List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
@@ -152,6 +168,7 @@ public class FnHarness {
options,
loggingApiServiceDescriptor,
controlApiServiceDescriptor,
+ statusApiServiceDescriptor,
channelFactory,
outboundObserverFactory);
}
@@ -164,6 +181,7 @@ public class FnHarness {
* @param options The options for this pipeline
* @param loggingApiServiceDescriptor
* @param controlApiServiceDescriptor
+ * @param statusApiServiceDescriptor
* @param channelFactory
* @param outboundObserverFactory
* @throws Exception
@@ -173,6 +191,7 @@ public class FnHarness {
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
+ Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
ManagedChannelFactory channelFactory,
OutboundObserverFactory outboundObserverFactory)
throws Exception {
@@ -229,6 +248,15 @@ public class FnHarness {
beamFnDataMultiplexer,
beamFnStateGrpcClientCache,
finalizeBundleHandler);
+
+ if (statusApiServiceDescriptor != null) {
+ new BeamFnStatusClient(
+ statusApiServiceDescriptor,
+ channelFactory::forDescriptor,
+ processBundleHandler.getBundleProcessorCache(),
+ options);
+ }
+
// TODO(BEAM-9729): Remove once runners no longer send this instruction.
handlers.put(
BeamFnApi.InstructionRequest.RequestCase.REGISTER,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 99b794e..0c7f39d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -538,6 +538,10 @@ public class ProcessBundleHandler {
return bundleProcessor;
}
+ public BundleProcessorCache getBundleProcessorCache() {
+ return bundleProcessorCache;
+ }
+
/** A cache for {@link BundleProcessor}s. */
public static class BundleProcessorCache {
@@ -579,6 +583,10 @@ public class ProcessBundleHandler {
return ImmutableMap.copyOf(cachedBundleProcessors.asMap());
}
+ public Map<String, BundleProcessor> getActiveBundleProcessors() {
+ return ImmutableMap.copyOf(activeBundleProcessors);
+ }
+
/**
* Get a {@link BundleProcessor} from the cache if it's available. Otherwise, create one using
* the specified {@code bundleProcessorSupplier}. The {@link BundleProcessor} that is returned
@@ -607,7 +615,7 @@ public class ProcessBundleHandler {
* Finds an active bundle processor for the specified {@code instructionId} or null if one could
* not be found.
*/
- BundleProcessor find(String instructionId) {
+ public BundleProcessor find(String instructionId) {
return activeBundleProcessors.get(instructionId);
}
@@ -686,7 +694,7 @@ public class ProcessBundleHandler {
abstract MetricsContainerStepMap getMetricsContainerRegistry();
- abstract ExecutionStateTracker getStateTracker();
+ public abstract ExecutionStateTracker getStateTracker();
abstract HandleStateCallsForBundle getBeamFnStateClient();
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
new file mode 100644
index 0000000..4c01c04
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BeamFnStatusClient {
+ private final StreamObserver<WorkerStatusResponse> outboundObserver;
+ private final BundleProcessorCache processBundleCache;
+ private static final Logger LOG = LoggerFactory.getLogger(BeamFnStatusClient.class);
+ private final MemoryMonitor memoryMonitor;
+
+ public BeamFnStatusClient(
+ ApiServiceDescriptor apiServiceDescriptor,
+ Function<ApiServiceDescriptor, ManagedChannel> channelFactory,
+ BundleProcessorCache processBundleCache,
+ PipelineOptions options) {
+ BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub stub =
+ BeamFnWorkerStatusGrpc.newStub(channelFactory.apply(apiServiceDescriptor));
+ this.outboundObserver = stub.workerStatus(new InboundObserver());
+ this.processBundleCache = processBundleCache;
+ this.memoryMonitor = MemoryMonitor.fromOptions(options);
+ Thread thread = new Thread(memoryMonitor);
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ thread.setName("MemoryMonitor");
+ thread.start();
+ }
+
+ /**
+ * Class representing the execution state of a thread.
+ *
+ * <p>Can be used in hash maps.
+ */
+ static class Stack {
+ final StackTraceElement[] elements;
+ final Thread.State state;
+
+ Stack(StackTraceElement[] elements, Thread.State state) {
+ this.elements = elements;
+ this.state = state;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(Arrays.deepHashCode(elements), state);
+ }
+
+ @Override
+ public boolean equals(@Nullable Object other) {
+ if (other == this) {
+ return true;
+ } else if (!(other instanceof Stack)) {
+ return false;
+ } else {
+ Stack that = (Stack) other;
+ return state == that.state && Arrays.deepEquals(elements, that.elements);
+ }
+ }
+ }
+
+ String getThreadDump() {
+ StringJoiner trace = new StringJoiner("\n");
+ trace.add("========== THREAD DUMP ==========");
+ // filter duplicates.
+ Map<Stack, List<String>> stacks = new HashMap<>();
+ Thread.getAllStackTraces()
+ .forEach(
+ (thread, elements) -> {
+ if (thread != Thread.currentThread()) {
+ Stack stack = new Stack(elements, thread.getState());
+ stacks.putIfAbsent(stack, new ArrayList<>());
+ stacks.get(stack).add(thread.toString());
+ }
+ });
+
+ // Stacks with more threads are printed first.
+ stacks.entrySet().stream()
+ .sorted(Comparator.comparingInt(entry -> -entry.getValue().size()))
+ .forEachOrdered(
+ entry -> {
+ Stack stack = entry.getKey();
+ List<String> threads = entry.getValue();
+ trace.add(
+ String.format(
+ "---- Threads (%d): %s State: %s Stack: ----",
+ threads.size(), threads, stack.state));
+ Arrays.stream(stack.elements).map(StackTraceElement::toString).forEach(trace::add);
+ trace.add("\n");
+ });
+ return trace.toString();
+ }
+
+ String getMemoryUsage() {
+ StringJoiner memory = new StringJoiner("\n");
+ memory.add("========== MEMORY USAGE ==========");
+ memory.add(memoryMonitor.describeMemory());
+ return memory.toString();
+ }
+ /** Class representing the execution state of a bundle. */
+ static class BundleState {
+ final String instruction;
+ final String trackedThreadName;
+ final long timeSinceTransition;
+
+ public String getInstruction() {
+ return instruction;
+ }
+
+ public String getTrackedThreadName() {
+ return trackedThreadName;
+ }
+
+ public long getTimeSinceTransition() {
+ return timeSinceTransition;
+ }
+
+ public BundleState(String instruction, String trackedThreadName, long timeSinceTransition) {
+ this.instruction = instruction;
+ this.trackedThreadName = trackedThreadName;
+ this.timeSinceTransition = timeSinceTransition;
+ }
+ }
+
+ @VisibleForTesting
+ String getActiveProcessBundleState() {
+ StringJoiner activeBundlesState = new StringJoiner("\n");
+ activeBundlesState.add("========== ACTIVE PROCESSING BUNDLES ==========");
+ if (processBundleCache.getActiveBundleProcessors().isEmpty()) {
+ activeBundlesState.add("No active processing bundles.");
+ } else {
+ List<BundleState> bundleStates = new ArrayList<>();
+ processBundleCache.getActiveBundleProcessors().keySet().stream()
+ .forEach(
+ instruction -> {
+ BundleProcessor bundleProcessor = processBundleCache.find(instruction);
+ if (bundleProcessor != null) {
+ ExecutionStateTracker executionStateTracker = bundleProcessor.getStateTracker();
+ Thread trackedTread = executionStateTracker.getTrackedThread();
+ if (trackedTread != null) {
+ bundleStates.add(
+ new BundleState(
+ instruction,
+ trackedTread.getName(),
+ executionStateTracker.getMillisSinceLastTransition()));
+ }
+ }
+ });
+ bundleStates.stream()
+ // reverse sort active bundle by time since last transition.
+ .sorted(Comparator.comparing(BundleState::getTimeSinceTransition).reversed())
+ .limit(10) // only keep top 10
+ .forEachOrdered(
+ bundleState -> {
+ activeBundlesState.add(
+ String.format("---- Instruction %s ----", bundleState.getInstruction()));
+ activeBundlesState.add(
+ String.format("Tracked thread: %s", bundleState.getTrackedThreadName()));
+ activeBundlesState.add(
+ String.format(
+ "Time since transition: %.2f seconds%n",
+ bundleState.getTimeSinceTransition() / 1000.0));
+ });
+ }
+ return activeBundlesState.toString();
+ }
+
+ private class InboundObserver implements StreamObserver<BeamFnApi.WorkerStatusRequest> {
+ @Override
+ public void onNext(WorkerStatusRequest workerStatusRequest) {
+ StringJoiner status = new StringJoiner("\n");
+ status.add(getMemoryUsage());
+ status.add("\n");
+ status.add(getActiveProcessBundleState());
+ status.add("\n");
+ status.add(getThreadDump());
+ outboundObserver.onNext(
+ WorkerStatusResponse.newBuilder()
+ .setId(workerStatusRequest.getId())
+ .setStatusInfo(status.toString())
+ .build());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.error("Error getting SDK harness status", t);
+ }
+
+ @Override
+ public void onCompleted() {}
+ }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java
new file mode 100644
index 0000000..2a444d6
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AtomicDouble;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A runnable which monitors a server for GC thrashing.
+ *
+ * <p>Note: Only one instance of this should be initialized per server and it should be done when
+ * the server starts running.
+ *
+ * <p>This runnable works as follows:
+ *
+ * <ul>
+ * <li>It wakes up periodically and determines how much time was spend on garbage collection since
+ * the last time it woke up.
+ * <li>If the time spent in garbage collection in the last period of time exceeds a certain
+ * threshold, that period is marked as "being in GC thrashing"
+ * <li>It keeps track of the GC thrashing status of the last few periods.
+ * <li>Every time the runnable's thread wakes up, it computes the ratio {@code (# monitored
+ * periods in GC thrashing) / (# monitored periods)}.
+ * <li>If this ratio exceeds a certain threshold, it is assumed that the server is in GC
+ * thrashing.
+ * <li>It can also shutdown the current jvm runtime when a threshold of consecutive gc thrashing
+ * count is met. A heap dump is made before shutdown.
+ * </ul>
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class MemoryMonitor implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryMonitor.class);
+
+ /** Amount of time (in ms) this thread must sleep between two consecutive iterations. */
+ public static final long DEFAULT_SLEEP_TIME_MILLIS = 15 * 1000; // 15 sec.
+
+ /**
+ * The number of periods to take into account when determining if the server is in GC thrashing.
+ */
+ private static final int NUM_MONITORED_PERIODS = 4; // ie 1 min's worth.
+
+ /**
+ * The <code>(# monitored periods in GC thrashing) / (# monitored
+ * periods)</code> threshold after which the server is considered to be in GC thrashing, expressed
+ * as a percentage.
+ */
+ private static final double GC_THRASHING_PERCENTAGE_PER_SERVER = 60.0;
+
+ /**
+ * The GC thrashing threshold percentage. A given period of time is considered "thrashing" if this
+ * percentage of CPU time is spent in garbage collection.
+ *
+ * <p>If {@literal 100} is given as the value, MemoryMonitor will be disabled.
+ */
+ private static final double GC_THRASHING_PERCENTAGE_PER_PERIOD = 50.0;
+
+ /**
+ * The amount of memory (in bytes) we should pre-allocate, in order to be able to dump the heap.
+ *
+ * <p>Since the server is in GC thrashing when we try to dump the heap, we might not be able to
+ * successfully do it. However, if we pre-allocate a big enough block of memory and "release" it
+ * right before trying to dump the heap, the pre-allocated block of memory will get GCed, and the
+ * heap dump might succeed.
+ */
+ private static final int HEAP_DUMP_RESERVED_BYTES = 10 << 20; // 10MB
+
+ /**
+ * Shutdown the current JVM instance after given consecutive gc thrashing periods are detected.
+ * This offers an opportunity to fast kill a JVM server if it is about to enter a long lasting gc
+ * thrashing state, which is almost never a desired behavior for a healthy server. 0 to disable.
+ */
+ private static final int DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING = 8; // ie 2 min's worth.
+
+ /** Delay between logging the current memory state. */
+ private static final int NORMAL_LOGGING_PERIOD_MILLIS = 5 * 60 * 1000; // 5 min.
+
+ /** Abstract interface for providing GC stats (for testing). */
+ public interface GCStatsProvider {
+ /** Return the total milliseconds spent in GC since JVM was started. */
+ long totalGCTimeMilliseconds();
+ }
+
+ /** True system GC stats. */
+ private static class SystemGCStatsProvider implements GCStatsProvider {
+ @Override
+ public long totalGCTimeMilliseconds() {
+ long inGC = 0;
+ for (GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {
+ inGC += gc.getCollectionTime();
+ }
+ return inGC;
+ }
+ }
+
+ /** Where to get GC stats. */
+ private final GCStatsProvider gcStatsProvider;
+
+ /** Actual sleep time, in milliseconds, for main monitor. */
+ private final long sleepTimeMillis;
+
+ /** Actual number of cycles before shutting down VM. */
+ private final int shutDownAfterNumGCThrashing;
+
+ /**
+ * The state of the periods that are taken into account when deciding if the server is in GC
+ * thrashing.
+ */
+ private final Queue<Boolean> periodIsThrashing = new ArrayDeque<>();
+
+ /** Keeps track of the time the server spent in GC since it started running. */
+ private long timeInGC = 0;
+
+ /**
+ * A reserved block of memory, needed to dump the heap. Dumping the heap requires memory. However,
+ * since we try to do it when the server is in GC thrashing, no memory is available and dumpHeap()
+ * brings the server down. If we pre-allocate a block of memory though, and "release" it right
+ * before dumping the heap, this block of memory will be garbage collected, thus giving dumpHeap()
+ * enough space to dump the heap.
+ */
+ @SuppressFBWarnings("unused")
+ private byte[] reservedForDumpingHeap = new byte[HEAP_DUMP_RESERVED_BYTES];
+
+ /** If true, dump the heap when thrashing or requested. */
+ private final boolean canDumpHeap;
+
+ /**
+ * The GC thrashing threshold for every period. If the time spent on garbage collection in one
+ * period exceeds this threshold, that period is considered to be in GC thrashing.
+ */
+ private final double gcThrashingPercentagePerPeriod;
+
+ private final AtomicBoolean isThrashing = new AtomicBoolean(false);
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+ private final AtomicDouble lastMeasuredGCPercentage = new AtomicDouble(0.0);
+ private final AtomicDouble maxGCPercentage = new AtomicDouble(0.0);
+ private final AtomicInteger numPushbacks = new AtomicInteger(0);
+
+ /** Wait point for threads in pushback waiting for gc thrashing to pass. */
+ private final Object waitingForResources = new Object();
+
+ /** Wait point for threads wanting to wait for change to isRunning or isThrashing state. */
+ private final Object waitingForStateChange = new Object();
+
+ /**
+ * If non null, if a heap dump is detected during initialization upload it to the given GCS path.
+ */
+ private final @Nullable String uploadFilePath;
+
+ private final File localDumpFolder;
+
+ public static MemoryMonitor fromOptions(PipelineOptions options) {
+ String uploadFilePath = options.getTempLocation();
+ boolean canDumpHeap = uploadFilePath != null;
+
+ return new MemoryMonitor(
+ new SystemGCStatsProvider(),
+ DEFAULT_SLEEP_TIME_MILLIS,
+ DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING,
+ canDumpHeap,
+ GC_THRASHING_PERCENTAGE_PER_PERIOD,
+ uploadFilePath,
+ getLoggingDir());
+ }
+
+ @VisibleForTesting
+ static MemoryMonitor forTest(
+ GCStatsProvider gcStatsProvider,
+ long sleepTimeMillis,
+ int shutDownAfterNumGCThrashing,
+ boolean canDumpHeap,
+ double gcThrashingPercentagePerPeriod,
+ @Nullable String uploadFilePath,
+ File localDumpFolder) {
+ return new MemoryMonitor(
+ gcStatsProvider,
+ sleepTimeMillis,
+ shutDownAfterNumGCThrashing,
+ canDumpHeap,
+ gcThrashingPercentagePerPeriod,
+ uploadFilePath,
+ localDumpFolder);
+ }
+
+ private MemoryMonitor(
+ GCStatsProvider gcStatsProvider,
+ long sleepTimeMillis,
+ int shutDownAfterNumGCThrashing,
+ boolean canDumpHeap,
+ double gcThrashingPercentagePerPeriod,
+ @Nullable String uploadFilePath,
+ File localDumpFolder) {
+ this.gcStatsProvider = gcStatsProvider;
+ this.sleepTimeMillis = sleepTimeMillis;
+ this.shutDownAfterNumGCThrashing = shutDownAfterNumGCThrashing;
+ this.canDumpHeap = canDumpHeap;
+ this.gcThrashingPercentagePerPeriod = gcThrashingPercentagePerPeriod;
+ this.uploadFilePath = uploadFilePath;
+ this.localDumpFolder = localDumpFolder;
+ }
+
+ /** For testing only: Wait for the monitor to be running. */
+ @VisibleForTesting
+ void waitForRunning() {
+ synchronized (waitingForStateChange) {
+ boolean interrupted = false;
+ try {
+ while (!isRunning.get()) {
+ try {
+ waitingForStateChange.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ // Retry test.
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /** For testing only: Wait for thrashing status to be updated to given value. */
+ @VisibleForTesting
+ public void waitForThrashingState(boolean desiredThrashingState) {
+ synchronized (waitingForStateChange) {
+ boolean interrupted = false;
+ try {
+ while (isThrashing.get() != desiredThrashingState) {
+ try {
+ waitingForStateChange.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ // Retry test.
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ private File getDefaultHeapDumpPath() {
+ return new File(localDumpFolder, "heap_dump.hprof");
+ }
+
+ @VisibleForTesting
+ boolean tryUploadHeapDumpIfItExists() {
+ if (uploadFilePath == null) {
+ return false;
+ }
+
+ boolean uploadedHeapDump = false;
+ File localSource = getDefaultHeapDumpPath();
+ LOG.info("Looking for heap dump at {}", localSource);
+ if (localSource.exists()) {
+ LOG.warn("Heap dump {} detected, attempting to upload file to ", localSource);
+ String remoteDest =
+ String.format("%s/heap_dump%s.hprof", uploadFilePath, UUID.randomUUID().toString());
+ ResourceId resource = FileSystems.matchNewResource(remoteDest, false);
+ try {
+ uploadFile(localSource, resource);
+ uploadedHeapDump = true;
+ LOG.warn("Heap dump {} uploaded to {}", localSource, remoteDest);
+ } catch (IOException e) {
+ LOG.error("Error uploading heap dump to {}", remoteDest, e);
+ }
+
+ try {
+ Files.delete(localSource.toPath());
+ LOG.info("Deleted local heap dump {}", localSource);
+ } catch (IOException e) {
+ LOG.warn("Unable to delete local heap dump {}", localSource, e);
+ }
+ }
+
+ return uploadedHeapDump;
+ }
+
+ private void uploadFile(File srcPath, ResourceId destination) throws IOException {
+ StandardCreateOptions createOptions =
+ StandardCreateOptions.builder().setMimeType("application/octet-stream").build();
+ try (WritableByteChannel dst = FileSystems.create(destination, createOptions)) {
+ try (ReadableByteChannel src = Channels.newChannel(new FileInputStream(srcPath))) {
+ ByteStreams.copy(src, dst);
+ }
+ }
+ }
+
+ /** Request the memory monitor stops. */
+ public void stop() {
+ synchronized (waitingForStateChange) {
+ isRunning.set(false);
+ waitingForStateChange.notifyAll();
+ }
+ }
+
+ public boolean isThrashing() {
+ return isThrashing.get();
+ }
+
+ /**
+ * Check if we've observed high gc workload in sufficient sample periods to justify classifying
+ * the server as in gc thrashing.
+ */
+ private void updateIsThrashing() {
+ // have we monitored enough periods?
+ if (periodIsThrashing.size() < NUM_MONITORED_PERIODS) {
+ setIsThrashing(false);
+ return;
+ }
+
+ // count the number of periods in GC thrashing
+ int numPeriodsInGCThrashing = 0;
+ for (Boolean state : periodIsThrashing) {
+ numPeriodsInGCThrashing += (state ? 1 : 0);
+ }
+
+ // Did we have too many periods in GC thrashing?
+ boolean serverInGcThrashing =
+ (numPeriodsInGCThrashing * 100
+ >= periodIsThrashing.size() * GC_THRASHING_PERCENTAGE_PER_SERVER);
+ setIsThrashing(serverInGcThrashing);
+ }
+
+ /** Set the thrashing state. */
+ private void setIsThrashing(boolean serverInGcThrashing) {
+ synchronized (waitingForResources) {
+ synchronized (waitingForStateChange) {
+ boolean prev = isThrashing.getAndSet(serverInGcThrashing);
+ if (prev && !serverInGcThrashing) {
+ waitingForResources.notifyAll();
+ }
+ if (prev != serverInGcThrashing) {
+ waitingForStateChange.notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * Determines if too much time was spent on garbage collection in the last period of time.
+ *
+ * @param now The current time.
+ * @param lastTimeWokeUp The last time this thread woke up.
+ * @return The state of the last period of time.
+ */
+ private boolean wasLastPeriodInGCThrashing(long now, long lastTimeWokeUp) {
+ // Find out how much time was spent on garbage collection
+ // since the start of the server. This queries the set of garbage collectors for
+ // how long each one has spent doing GC.
+ long inGC = gcStatsProvider.totalGCTimeMilliseconds();
+
+ // Compare the amount of time spent in GC thrashing to the given threshold;
+ // if config.getSleepTimeMillis() is equal to 0 (should happen in tests only),
+ // then we compare percentage-per-period to 100%
+ double gcPercentage = (inGC - timeInGC) * 100.0 / (now - lastTimeWokeUp);
+
+ lastMeasuredGCPercentage.set(gcPercentage);
+ maxGCPercentage.set(Math.max(maxGCPercentage.get(), gcPercentage));
+ timeInGC = inGC;
+
+ return gcPercentage > this.gcThrashingPercentagePerPeriod;
+ }
+
+ /**
+ * Updates the data we monitor.
+ *
+ * @param now The current time.
+ * @param lastTimeWokeUp The last time this thread woke up.
+ */
+ private void updateData(long now, long lastTimeWokeUp) {
+ // remove data that's no longer relevant
+ int numIntervals = NUM_MONITORED_PERIODS;
+ while (periodIsThrashing.size() >= numIntervals) {
+ periodIsThrashing.poll();
+ }
+ // store the state of the last period
+ boolean wasThrashing = wasLastPeriodInGCThrashing(now, lastTimeWokeUp);
+ periodIsThrashing.offer(wasThrashing);
+ }
+
+ /**
+ * Dumps the heap to a file and return the name of the file, or {@literal null} if the heap should
+ * not or could not be dumped.
+ *
+ * @return The name of the file the heap was dumped to, otherwise {@literal null}.
+ */
+ public @Nullable File tryToDumpHeap() {
+ if (!canDumpHeap) {
+ return null;
+ }
+
+ // Clearing this list should "release" some memory that will be needed to dump the heap.
+ // We could try to reallocate it again if we later notice memory pressure has subsided,
+ // but that is risk. Further, leaving this released may help with the memory pressure.
+ reservedForDumpingHeap = null;
+
+ try {
+ return dumpHeap();
+ } catch (Exception e) {
+ LOG.warn("Unable to dump heap: ", e);
+ return null;
+ }
+ }
+
+ @SuppressFBWarnings("DM_EXIT") // we deliberately System.exit under memory
+ private void shutDownDueToGcThrashing(int thrashingCount) {
+ File heapDumpFile = tryToDumpHeap();
+ LOG.error(
+ "Shutting down JVM after {} consecutive periods of measured GC thrashing. "
+ + "Memory is {}. Heap dump {}.",
+ thrashingCount,
+ describeMemory(),
+ heapDumpFile == null ? "not written" : ("written to '" + heapDumpFile + "'"));
+
+ System.exit(1);
+ }
+
+ /** Runs this thread. */
+ @Override
+ public void run() {
+ synchronized (waitingForStateChange) {
+ Preconditions.checkState(!isRunning.getAndSet(true), "already running");
+
+ if (this.gcThrashingPercentagePerPeriod <= 0 || this.gcThrashingPercentagePerPeriod >= 100) {
+ LOG.warn(
+ "gcThrashingPercentagePerPeriod: {} is not valid value. Not starting MemoryMonitor.",
+ this.gcThrashingPercentagePerPeriod);
+ isRunning.set(false);
+ }
+
+ waitingForStateChange.notifyAll();
+ }
+
+ // Within the memory monitor thread check to see if there is a pre-existing heap dump, and
+ // attempt to upload it. Note that this will delay the first memory monitor check.
+ tryUploadHeapDumpIfItExists();
+
+ try {
+ long lastTimeWokeUp = System.currentTimeMillis();
+ long lastLog = -1;
+ int currentThrashingCount = 0;
+ while (true) {
+ synchronized (waitingForStateChange) {
+ waitingForStateChange.wait(sleepTimeMillis);
+ }
+ if (!isRunning.get()) {
+ break;
+ }
+ long now = System.currentTimeMillis();
+
+ updateData(now, lastTimeWokeUp);
+ updateIsThrashing();
+
+ if (lastLog < 0 || lastLog + NORMAL_LOGGING_PERIOD_MILLIS < now) {
+ LOG.info("Memory is {}", describeMemory());
+ lastLog = now;
+ }
+
+ if (isThrashing.get()) {
+ currentThrashingCount++;
+
+ if (shutDownAfterNumGCThrashing > 0
+ && (currentThrashingCount >= shutDownAfterNumGCThrashing)) {
+ shutDownDueToGcThrashing(currentThrashingCount);
+ }
+ } else {
+ // Reset the counter whenever the server is evaluated not under gc thrashing.
+ currentThrashingCount = 0;
+ }
+
+ lastTimeWokeUp = now;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // most probably means that the server is shutting down
+ // in any case, there's not much we can do here
+ LOG.info("The GCThrashingMonitor was interrupted.");
+ }
+ }
+
+ /** Return only when the server is not in the GC thrashing state. */
+ public void waitForResources(String context) {
+ if (!isThrashing.get()) {
+ return;
+ }
+ numPushbacks.incrementAndGet();
+ LOG.info("Waiting for resources for {}. Memory is {}", context, describeMemory());
+ synchronized (waitingForResources) {
+ boolean interrupted = false;
+ try {
+ // No big deal if isThrashing became false in the meantime.
+ while (isThrashing.get()) {
+ try {
+ waitingForResources.wait();
+ } catch (InterruptedException e1) {
+ interrupted = true;
+ LOG.debug("waitForResources was interrupted.");
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ LOG.info("Resources granted for {}. Memory is {}", context, describeMemory());
+ }
+
+ /** Return the path for logging heap dumps. */
+ private static File getLoggingDir() {
+ return new File(System.getProperty("java.io.tmpdir"));
+ }
+
+ /**
+ * Dump the current heap profile to a file in the given directory and return its name.
+ *
+ * <p>NOTE: We deliberately don't salt the heap dump filename so as to minimize disk impact of
+ * repeated dumps. These files can be of comparable size to the local disk.
+ */
+ public File dumpHeap()
+ throws MalformedObjectNameException, InstanceNotFoundException, ReflectionException,
+ MBeanException, IOException {
+ return dumpHeap(localDumpFolder);
+ }
+
+ /**
+ * Dump the current heap profile to a file in the given directory and return its name.
+ *
+ * <p>NOTE: We deliberately don't salt the heap dump filename so as to minimize disk impact of
+ * repeated dumps. These files can be of comparable size to the local disk.
+ */
+ @VisibleForTesting
+ static synchronized File dumpHeap(File directory)
+ throws MalformedObjectNameException, InstanceNotFoundException, ReflectionException,
+ MBeanException, IOException {
+ boolean liveObjectsOnly = false;
+ File fileName = new File(directory, "heap_dump.hprof");
+ if (fileName.exists() && !fileName.delete()) {
+ throw new IOException("heap_dump.hprof already existed and couldn't be deleted!");
+ }
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName oname = new ObjectName("com.sun.management:type=HotSpotDiagnostic");
+ Object[] parameters = {fileName.getPath(), liveObjectsOnly};
+ String[] signatures = {String.class.getName(), boolean.class.getName()};
+ mbs.invoke(oname, "dumpHeap", parameters, signatures);
+
+ if (java.nio.file.FileSystems.getDefault().supportedFileAttributeViews().contains("posix")) {
+ Files.setPosixFilePermissions(
+ fileName.toPath(),
+ ImmutableSet.of(
+ PosixFilePermission.OWNER_READ,
+ PosixFilePermission.GROUP_READ,
+ PosixFilePermission.OTHERS_READ));
+ } else {
+ fileName.setReadable(true, true);
+ }
+
+ LOG.warn("Heap dumped to {}", fileName);
+
+ return fileName;
+ }
+
+ /** Return a string describing the current memory state of the server. */
+ public String describeMemory() {
+ Runtime runtime = Runtime.getRuntime();
+ long maxMemory = runtime.maxMemory();
+ long totalMemory = runtime.totalMemory();
+ long usedMemory = totalMemory - runtime.freeMemory();
+ return String.format(
+ "used/total/max = %d/%d/%d MB, GC last/max = %.2f/%.2f %%, #pushbacks=%d, gc thrashing=%s",
+ usedMemory >> 20,
+ totalMemory >> 20,
+ maxMemory >> 20,
+ lastMeasuredGCPercentage.get(),
+ maxGCPercentage.get(),
+ numPushbacks.get(),
+ isThrashing.get());
+ }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java
new file mode 100644
index 0000000..c3ec179
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Worker status client. */
+package org.apache.beam.fn.harness.status;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 17f37d7..97469c4 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -227,7 +227,7 @@ public class ProcessBundleHandlerTest {
}
@Override
- ExecutionStateTracker getStateTracker() {
+ public ExecutionStateTracker getStateTracker() {
return wrappedBundleProcessor.getStateTracker();
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
new file mode 100644
index 0000000..7905a2c
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.stringContainsInOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc.BeamFnWorkerStatusImplBase;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BeamFnStatusClientTest {
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+
+ @Test
+ public void testActiveBundleState() {
+ ProcessBundleHandler handler = mock(ProcessBundleHandler.class);
+ BundleProcessorCache processorCache = mock(BundleProcessorCache.class);
+ Map<String, BundleProcessor> bundleProcessorMap = new HashMap<>();
+ for (int i = 0; i < 11; i++) {
+ BundleProcessor processor = mock(BundleProcessor.class);
+ ExecutionStateTracker executionStateTracker = mock(ExecutionStateTracker.class);
+ when(processor.getStateTracker()).thenReturn(executionStateTracker);
+ when(executionStateTracker.getMillisSinceLastTransition())
+ .thenReturn(Integer.toUnsignedLong((10 - i) * 1000));
+ when(executionStateTracker.getTrackedThread()).thenReturn(Thread.currentThread());
+ String instruction = Integer.toString(i);
+ when(processorCache.find(instruction)).thenReturn(processor);
+ bundleProcessorMap.put(instruction, processor);
+ }
+ when(handler.getBundleProcessorCache()).thenReturn(processorCache);
+ when(processorCache.getActiveBundleProcessors()).thenReturn(bundleProcessorMap);
+
+ ManagedChannelFactory channelFactory = InProcessManagedChannelFactory.create();
+ BeamFnStatusClient client =
+ new BeamFnStatusClient(
+ apiServiceDescriptor,
+ channelFactory::forDescriptor,
+ handler.getBundleProcessorCache(),
+ PipelineOptionsFactory.create());
+ StringJoiner joiner = new StringJoiner("\n");
+ joiner.add(client.getActiveProcessBundleState());
+ String actualState = joiner.toString();
+
+ List<String> expectedInstructions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ expectedInstructions.add(String.format("Instruction %d", i));
+ }
+ assertThat(actualState, stringContainsInOrder(expectedInstructions));
+ assertThat(actualState, not(containsString("Instruction 10")));
+ }
+
+ @Test
+ public void testWorkerStatusResponse() throws Exception {
+ BlockingQueue<WorkerStatusResponse> values = new LinkedBlockingQueue<>();
+ BlockingQueue<StreamObserver<WorkerStatusRequest>> requestObservers =
+ new LinkedBlockingQueue<>();
+ StreamObserver<WorkerStatusResponse> inboundServerObserver =
+ TestStreams.withOnNext(values::add).build();
+ Server server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(
+ new BeamFnWorkerStatusImplBase() {
+ @Override
+ public StreamObserver<WorkerStatusResponse> workerStatus(
+ StreamObserver<WorkerStatusRequest> responseObserver) {
+ Uninterruptibles.putUninterruptibly(requestObservers, responseObserver);
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ try {
+ BundleProcessorCache processorCache = mock(BundleProcessorCache.class);
+ when(processorCache.getActiveBundleProcessors()).thenReturn(Collections.emptyMap());
+ ManagedChannelFactory channelFactory = InProcessManagedChannelFactory.create();
+ BeamFnStatusClient client =
+ new BeamFnStatusClient(
+ apiServiceDescriptor,
+ channelFactory::forDescriptor,
+ processorCache,
+ PipelineOptionsFactory.create());
+ StreamObserver<WorkerStatusRequest> requestObserver = requestObservers.take();
+ requestObserver.onNext(WorkerStatusRequest.newBuilder().setId("id").build());
+ WorkerStatusResponse response = values.take();
+ assertThat(response.getStatusInfo(), containsString("No active processing bundles."));
+ assertThat(response.getId(), is("id"));
+ } finally {
+ server.shutdownNow();
+ }
+ }
+}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java
new file mode 100644
index 0000000..2fa407c
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test the memory monitor will block threads when the server is in a (faked) GC thrashing state.
+ */
+@RunWith(JUnit4.class)
+public class MemoryMonitorTest {
+
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ static class FakeGCStatsProvider implements MemoryMonitor.GCStatsProvider {
+ AtomicBoolean inGCThrashingState = new AtomicBoolean(false);
+ long lastCallTimestamp = System.currentTimeMillis();
+ long lastGCResult = 0;
+
+ @Override
+ public long totalGCTimeMilliseconds() {
+ if (inGCThrashingState.get()) {
+ long now = System.currentTimeMillis();
+ lastGCResult += now - lastCallTimestamp;
+ lastCallTimestamp = now;
+ }
+ return lastGCResult;
+ }
+ }
+
+ private FakeGCStatsProvider provider;
+ private File localDumpFolder;
+ private MemoryMonitor monitor;
+ private Thread thread;
+
+ @Before
+ public void setup() throws IOException {
+ provider = new FakeGCStatsProvider();
+ localDumpFolder = tempFolder.newFolder();
+ // Update every 10ms, never shutdown VM.
+ monitor = MemoryMonitor.forTest(provider, 10, 0, false, 50.0, null, localDumpFolder);
+ thread = new Thread(monitor);
+ thread.start();
+ }
+
+ @Test(timeout = 1000)
+ public void detectGCThrashing() throws InterruptedException {
+ monitor.waitForRunning();
+ monitor.waitForResources("Test1");
+ provider.inGCThrashingState.set(true);
+ monitor.waitForThrashingState(true);
+ final Semaphore s = new Semaphore(0);
+ new Thread(
+ () -> {
+ monitor.waitForResources("Test2");
+ s.release();
+ })
+ .start();
+ assertFalse(s.tryAcquire(100, TimeUnit.MILLISECONDS));
+ provider.inGCThrashingState.set(false);
+ monitor.waitForThrashingState(false);
+ assertTrue(s.tryAcquire(100, TimeUnit.MILLISECONDS));
+ monitor.waitForResources("Test3");
+ }
+
+ @Test
+ public void heapDumpOnce() throws Exception {
+ File folder = tempFolder.newFolder();
+
+ File dump1 = MemoryMonitor.dumpHeap(folder);
+ assertNotNull(dump1);
+ assertTrue(dump1.exists());
+ assertThat(dump1.getParentFile(), Matchers.equalTo(folder));
+ }
+
+ @Test
+ public void heapDumpTwice() throws Exception {
+ File folder = tempFolder.newFolder();
+
+ File dump1 = MemoryMonitor.dumpHeap(folder);
+ assertNotNull(dump1);
+ assertTrue(dump1.exists());
+ assertThat(dump1.getParentFile(), Matchers.equalTo(folder));
+
+ File dump2 = MemoryMonitor.dumpHeap(folder);
+ assertNotNull(dump2);
+ assertTrue(dump2.exists());
+ assertThat(dump2.getParentFile(), Matchers.equalTo(folder));
+ }
+
+ @Test
+ public void uploadFile() throws Exception {
+ File remoteFolder = tempFolder.newFolder();
+ monitor =
+ MemoryMonitor.forTest(provider, 10, 0, true, 50.0, remoteFolder.getPath(), localDumpFolder);
+
+ // Force the monitor to generate a local heap dump
+ monitor.dumpHeap();
+
+ // Try to upload the heap dump
+ assertTrue(monitor.tryUploadHeapDumpIfItExists());
+
+ File[] files = remoteFolder.listFiles();
+ assertThat(files, Matchers.arrayWithSize(1));
+ assertThat(files[0].getAbsolutePath(), Matchers.containsString("heap_dump"));
+ assertThat(files[0].getAbsolutePath(), Matchers.containsString("hprof"));
+ }
+
+ @Test
+ public void uploadFileDisabled() throws Exception {
+ monitor = MemoryMonitor.forTest(provider, 10, 0, true, 50.0, null, localDumpFolder);
+
+ // Force the monitor to generate a local heap dump
+ monitor.dumpHeap();
+
+ // Try to upload the heap dump
+ assertFalse(monitor.tryUploadHeapDumpIfItExists());
+ }
+
+ @Test
+ public void disableMemoryMonitor() throws Exception {
+ MemoryMonitor disabledMonitor =
+ MemoryMonitor.forTest(provider, 10, 0, true, 100.0, null, localDumpFolder);
+ Thread disabledMonitorThread = new Thread(disabledMonitor);
+ disabledMonitorThread.start();
+
+ // Monitor thread should stop quickly after starting. Wait 10 seconds, and check that monitor
+ // thread is not alive.
+ disabledMonitorThread.join(10000);
+ assertFalse(disabledMonitorThread.isAlive());
+
+ // Enabled monitor thread should still be running.
+ assertTrue(thread.isAlive());
+ }
+}