You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2021/04/13 19:43:27 UTC

[beam] branch master updated: [BEAM-11945] Add debug capture to SDK harness (#14197)

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 67badf6  [BEAM-11945] Add debug capture to SDK harness (#14197)
67badf6 is described below

commit 67badf644eddacbbda1d4c2131c22dda1ee29d30
Author: kileys <ki...@google.com>
AuthorDate: Tue Apr 13 12:42:40 2021 -0700

    [BEAM-11945] Add debug capture to SDK harness (#14197)
---
 .../environment/EmbeddedEnvironmentFactory.java    |   1 +
 .../fnexecution/control/RemoteExecutionTest.java   |   1 +
 .../runners/portability/ExternalWorkerService.java |   3 +-
 .../java/org/apache/beam/fn/harness/FnHarness.java |  32 +-
 .../fn/harness/control/ProcessBundleHandler.java   |  12 +-
 .../beam/fn/harness/status/BeamFnStatusClient.java | 230 ++++++++
 .../beam/fn/harness/status/MemoryMonitor.java      | 635 +++++++++++++++++++++
 .../beam/fn/harness/status/package-info.java       |  20 +
 .../harness/control/ProcessBundleHandlerTest.java  |   2 +-
 .../fn/harness/status/BeamFnStatusClientTest.java  | 142 +++++
 .../beam/fn/harness/status/MemoryMonitorTest.java  | 166 ++++++
 11 files changed, 1238 insertions(+), 6 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index 73e14f6..0b617e9 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -102,6 +102,7 @@ public class EmbeddedEnvironmentFactory implements EnvironmentFactory {
                     options,
                     loggingServer.getApiServiceDescriptor(),
                     controlServer.getApiServiceDescriptor(),
+                    null,
                     InProcessManagedChannelFactory.create(),
                     OutboundObserverFactory.clientDirect());
               } catch (NoClassDefFoundError e) {
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 43e6c16..d37c41a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -214,6 +214,7 @@ public class RemoteExecutionTest implements Serializable {
                     PipelineOptionsFactory.create(),
                     loggingServer.getApiServiceDescriptor(),
                     controlServer.getApiServiceDescriptor(),
+                    null,
                     InProcessManagedChannelFactory.create(),
                     OutboundObserverFactory.clientDirect());
               } catch (Exception e) {
diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
index c4311a3..bbb7760 100644
--- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
+++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/ExternalWorkerService.java
@@ -59,7 +59,8 @@ public class ExternalWorkerService extends BeamFnExternalWorkerPoolImplBase impl
                     request.getWorkerId(),
                     options,
                     request.getLoggingEndpoint(),
-                    request.getControlEndpoint());
+                    request.getControlEndpoint(),
+                    null);
                 LOG.info("Successfully started worker {}.", request.getWorkerId());
               } catch (Exception exn) {
                 LOG.error(String.format("Failed to start worker %s.", request.getWorkerId()), exn);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 3a7b741..34ab171 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
 import org.apache.beam.fn.harness.control.BeamFnControlClient;
 import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
@@ -29,6 +30,7 @@ import org.apache.beam.fn.harness.control.ProcessBundleHandler;
 import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
+import org.apache.beam.fn.harness.status.BeamFnStatusClient;
 import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
@@ -81,6 +83,7 @@ public class FnHarness {
   private static final String HARNESS_ID = "HARNESS_ID";
   private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
   private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
+  private static final String STATUS_API_SERVICE_DESCRIPTOR = "STATUS_API_SERVICE_DESCRIPTOR";
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
 
@@ -105,6 +108,8 @@ public class FnHarness {
         "Logging location %s%n", environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
     System.out.format(
         "Control location %s%n", environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
+    System.out.format(
+        "Status location %s%n", environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
     System.out.format("Pipeline options %s%n", environmentVarGetter.apply(PIPELINE_OPTIONS));
 
     String id = environmentVarGetter.apply(HARNESS_ID);
@@ -117,7 +122,16 @@ public class FnHarness {
     Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
         getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
 
-    main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
+    Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
+        environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
+            ? null
+            : getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
+    main(
+        id,
+        options,
+        loggingApiServiceDescriptor,
+        controlApiServiceDescriptor,
+        statusApiServiceDescriptor);
   }
 
   /**
@@ -128,13 +142,15 @@ public class FnHarness {
    * @param options The options for this pipeline
    * @param loggingApiServiceDescriptor
    * @param controlApiServiceDescriptor
+   * @param statusApiServiceDescriptor
    * @throws Exception
    */
   public static void main(
       String id,
       PipelineOptions options,
       Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
-      Endpoints.ApiServiceDescriptor controlApiServiceDescriptor)
+      Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
+      @Nullable Endpoints.ApiServiceDescriptor statusApiServiceDescriptor)
       throws Exception {
     ManagedChannelFactory channelFactory;
     List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
@@ -152,6 +168,7 @@ public class FnHarness {
         options,
         loggingApiServiceDescriptor,
         controlApiServiceDescriptor,
+        statusApiServiceDescriptor,
         channelFactory,
         outboundObserverFactory);
   }
@@ -164,6 +181,7 @@ public class FnHarness {
    * @param options The options for this pipeline
    * @param loggingApiServiceDescriptor
    * @param controlApiServiceDescriptor
+   * @param statusApiServiceDescriptor
    * @param channelFactory
    * @param outboundObserverFactory
    * @throws Exception
@@ -173,6 +191,7 @@ public class FnHarness {
       PipelineOptions options,
       Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
       ManagedChannelFactory channelFactory,
       OutboundObserverFactory outboundObserverFactory)
       throws Exception {
@@ -229,6 +248,15 @@ public class FnHarness {
               beamFnDataMultiplexer,
               beamFnStateGrpcClientCache,
               finalizeBundleHandler);
+
+      if (statusApiServiceDescriptor != null) {
+        new BeamFnStatusClient(
+            statusApiServiceDescriptor,
+            channelFactory::forDescriptor,
+            processBundleHandler.getBundleProcessorCache(),
+            options);
+      }
+
       // TODO(BEAM-9729): Remove once runners no longer send this instruction.
       handlers.put(
           BeamFnApi.InstructionRequest.RequestCase.REGISTER,
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 99b794e..0c7f39d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -538,6 +538,10 @@ public class ProcessBundleHandler {
     return bundleProcessor;
   }
 
+  public BundleProcessorCache getBundleProcessorCache() {
+    return bundleProcessorCache;
+  }
+
   /** A cache for {@link BundleProcessor}s. */
   public static class BundleProcessorCache {
 
@@ -579,6 +583,10 @@ public class ProcessBundleHandler {
       return ImmutableMap.copyOf(cachedBundleProcessors.asMap());
     }
 
+    public Map<String, BundleProcessor> getActiveBundleProcessors() {
+      return ImmutableMap.copyOf(activeBundleProcessors);
+    }
+
     /**
      * Get a {@link BundleProcessor} from the cache if it's available. Otherwise, create one using
      * the specified {@code bundleProcessorSupplier}. The {@link BundleProcessor} that is returned
@@ -607,7 +615,7 @@ public class ProcessBundleHandler {
      * Finds an active bundle processor for the specified {@code instructionId} or null if one could
      * not be found.
      */
-    BundleProcessor find(String instructionId) {
+    public BundleProcessor find(String instructionId) {
       return activeBundleProcessors.get(instructionId);
     }
 
@@ -686,7 +694,7 @@ public class ProcessBundleHandler {
 
     abstract MetricsContainerStepMap getMetricsContainerRegistry();
 
-    abstract ExecutionStateTracker getStateTracker();
+    public abstract ExecutionStateTracker getStateTracker();
 
     abstract HandleStateCallsForBundle getBeamFnStateClient();
 
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
new file mode 100644
index 0000000..4c01c04
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/BeamFnStatusClient.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.function.Function;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BeamFnStatusClient {
+  private final StreamObserver<WorkerStatusResponse> outboundObserver;
+  private final BundleProcessorCache processBundleCache;
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnStatusClient.class);
+  private final MemoryMonitor memoryMonitor;
+
+  public BeamFnStatusClient(
+      ApiServiceDescriptor apiServiceDescriptor,
+      Function<ApiServiceDescriptor, ManagedChannel> channelFactory,
+      BundleProcessorCache processBundleCache,
+      PipelineOptions options) {
+    BeamFnWorkerStatusGrpc.BeamFnWorkerStatusStub stub =
+        BeamFnWorkerStatusGrpc.newStub(channelFactory.apply(apiServiceDescriptor));
+    this.outboundObserver = stub.workerStatus(new InboundObserver());
+    this.processBundleCache = processBundleCache;
+    this.memoryMonitor = MemoryMonitor.fromOptions(options);
+    Thread thread = new Thread(memoryMonitor);
+    thread.setDaemon(true);
+    thread.setPriority(Thread.MIN_PRIORITY);
+    thread.setName("MemoryMonitor");
+    thread.start();
+  }
+
+  /**
+   * Class representing the execution state of a thread.
+   *
+   * <p>Can be used in hash maps.
+   */
+  static class Stack {
+    final StackTraceElement[] elements;
+    final Thread.State state;
+
+    Stack(StackTraceElement[] elements, Thread.State state) {
+      this.elements = elements;
+      this.state = state;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(Arrays.deepHashCode(elements), state);
+    }
+
+    @Override
+    public boolean equals(@Nullable Object other) {
+      if (other == this) {
+        return true;
+      } else if (!(other instanceof Stack)) {
+        return false;
+      } else {
+        Stack that = (Stack) other;
+        return state == that.state && Arrays.deepEquals(elements, that.elements);
+      }
+    }
+  }
+
+  String getThreadDump() {
+    StringJoiner trace = new StringJoiner("\n");
+    trace.add("========== THREAD DUMP ==========");
+    // filter duplicates.
+    Map<Stack, List<String>> stacks = new HashMap<>();
+    Thread.getAllStackTraces()
+        .forEach(
+            (thread, elements) -> {
+              if (thread != Thread.currentThread()) {
+                Stack stack = new Stack(elements, thread.getState());
+                stacks.putIfAbsent(stack, new ArrayList<>());
+                stacks.get(stack).add(thread.toString());
+              }
+            });
+
+    // Stacks with more threads are printed first.
+    stacks.entrySet().stream()
+        .sorted(Comparator.comparingInt(entry -> -entry.getValue().size()))
+        .forEachOrdered(
+            entry -> {
+              Stack stack = entry.getKey();
+              List<String> threads = entry.getValue();
+              trace.add(
+                  String.format(
+                      "---- Threads (%d): %s State: %s Stack: ----",
+                      threads.size(), threads, stack.state));
+              Arrays.stream(stack.elements).map(StackTraceElement::toString).forEach(trace::add);
+              trace.add("\n");
+            });
+    return trace.toString();
+  }
+
+  String getMemoryUsage() {
+    StringJoiner memory = new StringJoiner("\n");
+    memory.add("========== MEMORY USAGE ==========");
+    memory.add(memoryMonitor.describeMemory());
+    return memory.toString();
+  }
+  /** Class representing the execution state of a bundle. */
+  static class BundleState {
+    final String instruction;
+    final String trackedThreadName;
+    final long timeSinceTransition;
+
+    public String getInstruction() {
+      return instruction;
+    }
+
+    public String getTrackedThreadName() {
+      return trackedThreadName;
+    }
+
+    public long getTimeSinceTransition() {
+      return timeSinceTransition;
+    }
+
+    public BundleState(String instruction, String trackedThreadName, long timeSinceTransition) {
+      this.instruction = instruction;
+      this.trackedThreadName = trackedThreadName;
+      this.timeSinceTransition = timeSinceTransition;
+    }
+  }
+
+  @VisibleForTesting
+  String getActiveProcessBundleState() {
+    StringJoiner activeBundlesState = new StringJoiner("\n");
+    activeBundlesState.add("========== ACTIVE PROCESSING BUNDLES ==========");
+    if (processBundleCache.getActiveBundleProcessors().isEmpty()) {
+      activeBundlesState.add("No active processing bundles.");
+    } else {
+      List<BundleState> bundleStates = new ArrayList<>();
+      processBundleCache.getActiveBundleProcessors().keySet().stream()
+          .forEach(
+              instruction -> {
+                BundleProcessor bundleProcessor = processBundleCache.find(instruction);
+                if (bundleProcessor != null) {
+                  ExecutionStateTracker executionStateTracker = bundleProcessor.getStateTracker();
+                  Thread trackedTread = executionStateTracker.getTrackedThread();
+                  if (trackedTread != null) {
+                    bundleStates.add(
+                        new BundleState(
+                            instruction,
+                            trackedTread.getName(),
+                            executionStateTracker.getMillisSinceLastTransition()));
+                  }
+                }
+              });
+      bundleStates.stream()
+          // reverse sort active bundle by time since last transition.
+          .sorted(Comparator.comparing(BundleState::getTimeSinceTransition).reversed())
+          .limit(10) // only keep top 10
+          .forEachOrdered(
+              bundleState -> {
+                activeBundlesState.add(
+                    String.format("---- Instruction %s ----", bundleState.getInstruction()));
+                activeBundlesState.add(
+                    String.format("Tracked thread: %s", bundleState.getTrackedThreadName()));
+                activeBundlesState.add(
+                    String.format(
+                        "Time since transition: %.2f seconds%n",
+                        bundleState.getTimeSinceTransition() / 1000.0));
+              });
+    }
+    return activeBundlesState.toString();
+  }
+
+  private class InboundObserver implements StreamObserver<BeamFnApi.WorkerStatusRequest> {
+    @Override
+    public void onNext(WorkerStatusRequest workerStatusRequest) {
+      StringJoiner status = new StringJoiner("\n");
+      status.add(getMemoryUsage());
+      status.add("\n");
+      status.add(getActiveProcessBundleState());
+      status.add("\n");
+      status.add(getThreadDump());
+      outboundObserver.onNext(
+          WorkerStatusResponse.newBuilder()
+              .setId(workerStatusRequest.getId())
+              .setStatusInfo(status.toString())
+              .build());
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOG.error("Error getting SDK harness status", t);
+    }
+
+    @Override
+    public void onCompleted() {}
+  }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java
new file mode 100644
index 0000000..2a444d6
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/MemoryMonitor.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AtomicDouble;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A runnable which monitors a server for GC thrashing.
+ *
+ * <p>Note: Only one instance of this should be initialized per server and it should be done when
+ * the server starts running.
+ *
+ * <p>This runnable works as follows:
+ *
+ * <ul>
+ *   <li>It wakes up periodically and determines how much time was spend on garbage collection since
+ *       the last time it woke up.
+ *   <li>If the time spent in garbage collection in the last period of time exceeds a certain
+ *       threshold, that period is marked as "being in GC thrashing"
+ *   <li>It keeps track of the GC thrashing status of the last few periods.
+ *   <li>Every time the runnable's thread wakes up, it computes the ratio {@code (# monitored
+ *       periods in GC thrashing) / (# monitored periods)}.
+ *   <li>If this ratio exceeds a certain threshold, it is assumed that the server is in GC
+ *       thrashing.
+ *   <li>It can also shutdown the current jvm runtime when a threshold of consecutive gc thrashing
+ *       count is met. A heap dump is made before shutdown.
+ * </ul>
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class MemoryMonitor implements Runnable {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryMonitor.class);
+
+  /** Amount of time (in ms) this thread must sleep between two consecutive iterations. */
+  public static final long DEFAULT_SLEEP_TIME_MILLIS = 15 * 1000; // 15 sec.
+
+  /**
+   * The number of periods to take into account when determining if the server is in GC thrashing.
+   */
+  private static final int NUM_MONITORED_PERIODS = 4; // ie 1 min's worth.
+
+  /**
+   * The <code>(# monitored periods in GC thrashing) / (# monitored
+   * periods)</code> threshold after which the server is considered to be in GC thrashing, expressed
+   * as a percentage.
+   */
+  private static final double GC_THRASHING_PERCENTAGE_PER_SERVER = 60.0;
+
+  /**
+   * The GC thrashing threshold percentage. A given period of time is considered "thrashing" if this
+   * percentage of CPU time is spent in garbage collection.
+   *
+   * <p>If {@literal 100} is given as the value, MemoryMonitor will be disabled.
+   */
+  private static final double GC_THRASHING_PERCENTAGE_PER_PERIOD = 50.0;
+
+  /**
+   * The amount of memory (in bytes) we should pre-allocate, in order to be able to dump the heap.
+   *
+   * <p>Since the server is in GC thrashing when we try to dump the heap, we might not be able to
+   * successfully do it. However, if we pre-allocate a big enough block of memory and "release" it
+   * right before trying to dump the heap, the pre-allocated block of memory will get GCed, and the
+   * heap dump might succeed.
+   */
+  private static final int HEAP_DUMP_RESERVED_BYTES = 10 << 20; // 10MB
+
+  /**
+   * Shutdown the current JVM instance after given consecutive gc thrashing periods are detected.
+   * This offers an opportunity to fast kill a JVM server if it is about to enter a long lasting gc
+   * thrashing state, which is almost never a desired behavior for a healthy server. 0 to disable.
+   */
+  private static final int DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING = 8; // ie 2 min's worth.
+
+  /** Delay between logging the current memory state. */
+  private static final int NORMAL_LOGGING_PERIOD_MILLIS = 5 * 60 * 1000; // 5 min.
+
+  /** Abstract interface for providing GC stats (for testing). */
+  public interface GCStatsProvider {
+    /** Return the total milliseconds spent in GC since JVM was started. */
+    long totalGCTimeMilliseconds();
+  }
+
+  /** True system GC stats. */
+  private static class SystemGCStatsProvider implements GCStatsProvider {
+    @Override
+    public long totalGCTimeMilliseconds() {
+      long inGC = 0;
+      for (GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {
+        inGC += gc.getCollectionTime();
+      }
+      return inGC;
+    }
+  }
+
+  /** Where to get GC stats. */
+  private final GCStatsProvider gcStatsProvider;
+
+  /** Actual sleep time, in milliseconds, for main monitor. */
+  private final long sleepTimeMillis;
+
+  /** Actual number of cycles before shutting down VM. */
+  private final int shutDownAfterNumGCThrashing;
+
+  /**
+   * The state of the periods that are taken into account when deciding if the server is in GC
+   * thrashing.
+   */
+  private final Queue<Boolean> periodIsThrashing = new ArrayDeque<>();
+
+  /** Keeps track of the time the server spent in GC since it started running. */
+  private long timeInGC = 0;
+
+  /**
+   * A reserved block of memory, needed to dump the heap. Dumping the heap requires memory. However,
+   * since we try to do it when the server is in GC thrashing, no memory is available and dumpHeap()
+   * brings the server down. If we pre-allocate a block of memory though, and "release" it right
+   * before dumping the heap, this block of memory will be garbage collected, thus giving dumpHeap()
+   * enough space to dump the heap.
+   */
+  @SuppressFBWarnings("unused")
+  private byte[] reservedForDumpingHeap = new byte[HEAP_DUMP_RESERVED_BYTES];
+
+  /** If true, dump the heap when thrashing or requested. */
+  private final boolean canDumpHeap;
+
+  /**
+   * The GC thrashing threshold for every period. If the time spent on garbage collection in one
+   * period exceeds this threshold, that period is considered to be in GC thrashing.
+   */
+  private final double gcThrashingPercentagePerPeriod;
+
+  private final AtomicBoolean isThrashing = new AtomicBoolean(false);
+
+  private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
+  private final AtomicDouble lastMeasuredGCPercentage = new AtomicDouble(0.0);
+  private final AtomicDouble maxGCPercentage = new AtomicDouble(0.0);
+  private final AtomicInteger numPushbacks = new AtomicInteger(0);
+
+  /** Wait point for threads in pushback waiting for gc thrashing to pass. */
+  private final Object waitingForResources = new Object();
+
+  /** Wait point for threads wanting to wait for change to isRunning or isThrashing state. */
+  private final Object waitingForStateChange = new Object();
+
+  /**
+   * If non null, if a heap dump is detected during initialization upload it to the given GCS path.
+   */
+  private final @Nullable String uploadFilePath;
+
+  private final File localDumpFolder;
+
+  public static MemoryMonitor fromOptions(PipelineOptions options) {
+    String uploadFilePath = options.getTempLocation();
+    boolean canDumpHeap = uploadFilePath != null;
+
+    return new MemoryMonitor(
+        new SystemGCStatsProvider(),
+        DEFAULT_SLEEP_TIME_MILLIS,
+        DEFAULT_SHUT_DOWN_AFTER_NUM_GCTHRASHING,
+        canDumpHeap,
+        GC_THRASHING_PERCENTAGE_PER_PERIOD,
+        uploadFilePath,
+        getLoggingDir());
+  }
+
+  @VisibleForTesting
+  static MemoryMonitor forTest(
+      GCStatsProvider gcStatsProvider,
+      long sleepTimeMillis,
+      int shutDownAfterNumGCThrashing,
+      boolean canDumpHeap,
+      double gcThrashingPercentagePerPeriod,
+      @Nullable String uploadFilePath,
+      File localDumpFolder) {
+    return new MemoryMonitor(
+        gcStatsProvider,
+        sleepTimeMillis,
+        shutDownAfterNumGCThrashing,
+        canDumpHeap,
+        gcThrashingPercentagePerPeriod,
+        uploadFilePath,
+        localDumpFolder);
+  }
+
+  private MemoryMonitor(
+      GCStatsProvider gcStatsProvider,
+      long sleepTimeMillis,
+      int shutDownAfterNumGCThrashing,
+      boolean canDumpHeap,
+      double gcThrashingPercentagePerPeriod,
+      @Nullable String uploadFilePath,
+      File localDumpFolder) {
+    this.gcStatsProvider = gcStatsProvider;
+    this.sleepTimeMillis = sleepTimeMillis;
+    this.shutDownAfterNumGCThrashing = shutDownAfterNumGCThrashing;
+    this.canDumpHeap = canDumpHeap;
+    this.gcThrashingPercentagePerPeriod = gcThrashingPercentagePerPeriod;
+    this.uploadFilePath = uploadFilePath;
+    this.localDumpFolder = localDumpFolder;
+  }
+
+  /** For testing only: Wait for the monitor to be running. */
+  @VisibleForTesting
+  void waitForRunning() {
+    synchronized (waitingForStateChange) {
+      boolean interrupted = false;
+      try {
+        while (!isRunning.get()) {
+          try {
+            waitingForStateChange.wait();
+          } catch (InterruptedException e) {
+            interrupted = true;
+            // Retry test.
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  /** For testing only: Wait for thrashing status to be updated to given value. */
+  @VisibleForTesting
+  public void waitForThrashingState(boolean desiredThrashingState) {
+    synchronized (waitingForStateChange) {
+      boolean interrupted = false;
+      try {
+        while (isThrashing.get() != desiredThrashingState) {
+          try {
+            waitingForStateChange.wait();
+          } catch (InterruptedException e) {
+            interrupted = true;
+            // Retry test.
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  private File getDefaultHeapDumpPath() {
+    return new File(localDumpFolder, "heap_dump.hprof");
+  }
+
+  @VisibleForTesting
+  boolean tryUploadHeapDumpIfItExists() {
+    if (uploadFilePath == null) {
+      return false;
+    }
+
+    boolean uploadedHeapDump = false;
+    File localSource = getDefaultHeapDumpPath();
+    LOG.info("Looking for heap dump at {}", localSource);
+    if (localSource.exists()) {
+      LOG.warn("Heap dump {} detected, attempting to upload file to ", localSource);
+      String remoteDest =
+          String.format("%s/heap_dump%s.hprof", uploadFilePath, UUID.randomUUID().toString());
+      ResourceId resource = FileSystems.matchNewResource(remoteDest, false);
+      try {
+        uploadFile(localSource, resource);
+        uploadedHeapDump = true;
+        LOG.warn("Heap dump {} uploaded to {}", localSource, remoteDest);
+      } catch (IOException e) {
+        LOG.error("Error uploading heap dump to {}", remoteDest, e);
+      }
+
+      try {
+        Files.delete(localSource.toPath());
+        LOG.info("Deleted local heap dump {}", localSource);
+      } catch (IOException e) {
+        LOG.warn("Unable to delete local heap dump {}", localSource, e);
+      }
+    }
+
+    return uploadedHeapDump;
+  }
+
+  private void uploadFile(File srcPath, ResourceId destination) throws IOException {
+    StandardCreateOptions createOptions =
+        StandardCreateOptions.builder().setMimeType("application/octet-stream").build();
+    try (WritableByteChannel dst = FileSystems.create(destination, createOptions)) {
+      try (ReadableByteChannel src = Channels.newChannel(new FileInputStream(srcPath))) {
+        ByteStreams.copy(src, dst);
+      }
+    }
+  }
+
+  /** Request the memory monitor stops. */
+  public void stop() {
+    synchronized (waitingForStateChange) {
+      isRunning.set(false);
+      waitingForStateChange.notifyAll();
+    }
+  }
+
+  public boolean isThrashing() {
+    return isThrashing.get();
+  }
+
+  /**
+   * Check if we've observed high gc workload in sufficient sample periods to justify classifying
+   * the server as in gc thrashing.
+   */
+  private void updateIsThrashing() {
+    // have we monitored enough periods?
+    if (periodIsThrashing.size() < NUM_MONITORED_PERIODS) {
+      setIsThrashing(false);
+      return;
+    }
+
+    // count the number of periods in GC thrashing
+    int numPeriodsInGCThrashing = 0;
+    for (Boolean state : periodIsThrashing) {
+      numPeriodsInGCThrashing += (state ? 1 : 0);
+    }
+
+    // Did we have too many periods in GC thrashing?
+    boolean serverInGcThrashing =
+        (numPeriodsInGCThrashing * 100
+            >= periodIsThrashing.size() * GC_THRASHING_PERCENTAGE_PER_SERVER);
+    setIsThrashing(serverInGcThrashing);
+  }
+
+  /** Set the thrashing state. */
+  private void setIsThrashing(boolean serverInGcThrashing) {
+    synchronized (waitingForResources) {
+      synchronized (waitingForStateChange) {
+        boolean prev = isThrashing.getAndSet(serverInGcThrashing);
+        if (prev && !serverInGcThrashing) {
+          waitingForResources.notifyAll();
+        }
+        if (prev != serverInGcThrashing) {
+          waitingForStateChange.notifyAll();
+        }
+      }
+    }
+  }
+
+  /**
+   * Determines if too much time was spent on garbage collection in the last period of time.
+   *
+   * @param now The current time.
+   * @param lastTimeWokeUp The last time this thread woke up.
+   * @return The state of the last period of time.
+   */
+  private boolean wasLastPeriodInGCThrashing(long now, long lastTimeWokeUp) {
+    // Find out how much time was spent on garbage collection
+    // since the start of the server.  This queries the set of garbage collectors for
+    // how long each one has spent doing GC.
+    long inGC = gcStatsProvider.totalGCTimeMilliseconds();
+
+    // Compare the amount of time spent in GC thrashing to the given threshold;
+    // if config.getSleepTimeMillis() is equal to 0 (should happen in tests only),
+    // then we compare percentage-per-period to 100%
+    double gcPercentage = (inGC - timeInGC) * 100.0 / (now - lastTimeWokeUp);
+
+    lastMeasuredGCPercentage.set(gcPercentage);
+    maxGCPercentage.set(Math.max(maxGCPercentage.get(), gcPercentage));
+    timeInGC = inGC;
+
+    return gcPercentage > this.gcThrashingPercentagePerPeriod;
+  }
+
+  /**
+   * Updates the data we monitor.
+   *
+   * @param now The current time.
+   * @param lastTimeWokeUp The last time this thread woke up.
+   */
+  private void updateData(long now, long lastTimeWokeUp) {
+    // remove data that's no longer relevant
+    int numIntervals = NUM_MONITORED_PERIODS;
+    while (periodIsThrashing.size() >= numIntervals) {
+      periodIsThrashing.poll();
+    }
+    // store the state of the last period
+    boolean wasThrashing = wasLastPeriodInGCThrashing(now, lastTimeWokeUp);
+    periodIsThrashing.offer(wasThrashing);
+  }
+
+  /**
+   * Dumps the heap to a file and return the name of the file, or {@literal null} if the heap should
+   * not or could not be dumped.
+   *
+   * @return The name of the file the heap was dumped to, otherwise {@literal null}.
+   */
+  public @Nullable File tryToDumpHeap() {
+    if (!canDumpHeap) {
+      return null;
+    }
+
+    // Clearing this list should "release" some memory that will be needed to dump the heap.
+    // We could try to reallocate it again if we later notice memory pressure has subsided,
+    // but that is risk. Further, leaving this released may help with the memory pressure.
+    reservedForDumpingHeap = null;
+
+    try {
+      return dumpHeap();
+    } catch (Exception e) {
+      LOG.warn("Unable to dump heap: ", e);
+      return null;
+    }
+  }
+
+  @SuppressFBWarnings("DM_EXIT") // we deliberately System.exit under memory
+  private void shutDownDueToGcThrashing(int thrashingCount) {
+    File heapDumpFile = tryToDumpHeap();
+    LOG.error(
+        "Shutting down JVM after {} consecutive periods of measured GC thrashing. "
+            + "Memory is {}. Heap dump {}.",
+        thrashingCount,
+        describeMemory(),
+        heapDumpFile == null ? "not written" : ("written to '" + heapDumpFile + "'"));
+
+    System.exit(1);
+  }
+
+  /** Runs this thread. */
+  @Override
+  public void run() {
+    synchronized (waitingForStateChange) {
+      Preconditions.checkState(!isRunning.getAndSet(true), "already running");
+
+      if (this.gcThrashingPercentagePerPeriod <= 0 || this.gcThrashingPercentagePerPeriod >= 100) {
+        LOG.warn(
+            "gcThrashingPercentagePerPeriod: {} is not valid value. Not starting MemoryMonitor.",
+            this.gcThrashingPercentagePerPeriod);
+        isRunning.set(false);
+      }
+
+      waitingForStateChange.notifyAll();
+    }
+
+    // Within the memory monitor thread check to see if there is a pre-existing heap dump, and
+    // attempt to upload it. Note that this will delay the first memory monitor check.
+    tryUploadHeapDumpIfItExists();
+
+    try {
+      long lastTimeWokeUp = System.currentTimeMillis();
+      long lastLog = -1;
+      int currentThrashingCount = 0;
+      while (true) {
+        synchronized (waitingForStateChange) {
+          waitingForStateChange.wait(sleepTimeMillis);
+        }
+        if (!isRunning.get()) {
+          break;
+        }
+        long now = System.currentTimeMillis();
+
+        updateData(now, lastTimeWokeUp);
+        updateIsThrashing();
+
+        if (lastLog < 0 || lastLog + NORMAL_LOGGING_PERIOD_MILLIS < now) {
+          LOG.info("Memory is {}", describeMemory());
+          lastLog = now;
+        }
+
+        if (isThrashing.get()) {
+          currentThrashingCount++;
+
+          if (shutDownAfterNumGCThrashing > 0
+              && (currentThrashingCount >= shutDownAfterNumGCThrashing)) {
+            shutDownDueToGcThrashing(currentThrashingCount);
+          }
+        } else {
+          // Reset the counter whenever the server is evaluated not under gc thrashing.
+          currentThrashingCount = 0;
+        }
+
+        lastTimeWokeUp = now;
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      // most probably means that the server is shutting down
+      // in any case, there's not much we can do here
+      LOG.info("The GCThrashingMonitor was interrupted.");
+    }
+  }
+
+  /** Return only when the server is not in the GC thrashing state. */
+  public void waitForResources(String context) {
+    if (!isThrashing.get()) {
+      return;
+    }
+    numPushbacks.incrementAndGet();
+    LOG.info("Waiting for resources for {}. Memory is {}", context, describeMemory());
+    synchronized (waitingForResources) {
+      boolean interrupted = false;
+      try {
+        // No big deal if isThrashing became false in the meantime.
+        while (isThrashing.get()) {
+          try {
+            waitingForResources.wait();
+          } catch (InterruptedException e1) {
+            interrupted = true;
+            LOG.debug("waitForResources was interrupted.");
+          }
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    LOG.info("Resources granted for {}. Memory is {}", context, describeMemory());
+  }
+
+  /** Return the path for logging heap dumps. */
+  private static File getLoggingDir() {
+    return new File(System.getProperty("java.io.tmpdir"));
+  }
+
+  /**
+   * Dump the current heap profile to a file in the given directory and return its name.
+   *
+   * <p>NOTE: We deliberately don't salt the heap dump filename so as to minimize disk impact of
+   * repeated dumps. These files can be of comparable size to the local disk.
+   */
+  public File dumpHeap()
+      throws MalformedObjectNameException, InstanceNotFoundException, ReflectionException,
+          MBeanException, IOException {
+    return dumpHeap(localDumpFolder);
+  }
+
+  /**
+   * Dump the current heap profile to a file in the given directory and return its name.
+   *
+   * <p>NOTE: We deliberately don't salt the heap dump filename so as to minimize disk impact of
+   * repeated dumps. These files can be of comparable size to the local disk.
+   */
+  @VisibleForTesting
+  static synchronized File dumpHeap(File directory)
+      throws MalformedObjectNameException, InstanceNotFoundException, ReflectionException,
+          MBeanException, IOException {
+    boolean liveObjectsOnly = false;
+    File fileName = new File(directory, "heap_dump.hprof");
+    if (fileName.exists() && !fileName.delete()) {
+      throw new IOException("heap_dump.hprof already existed and couldn't be deleted!");
+    }
+
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName oname = new ObjectName("com.sun.management:type=HotSpotDiagnostic");
+    Object[] parameters = {fileName.getPath(), liveObjectsOnly};
+    String[] signatures = {String.class.getName(), boolean.class.getName()};
+    mbs.invoke(oname, "dumpHeap", parameters, signatures);
+
+    if (java.nio.file.FileSystems.getDefault().supportedFileAttributeViews().contains("posix")) {
+      Files.setPosixFilePermissions(
+          fileName.toPath(),
+          ImmutableSet.of(
+              PosixFilePermission.OWNER_READ,
+              PosixFilePermission.GROUP_READ,
+              PosixFilePermission.OTHERS_READ));
+    } else {
+      fileName.setReadable(true, true);
+    }
+
+    LOG.warn("Heap dumped to {}", fileName);
+
+    return fileName;
+  }
+
+  /** Return a string describing the current memory state of the server. */
+  public String describeMemory() {
+    Runtime runtime = Runtime.getRuntime();
+    long maxMemory = runtime.maxMemory();
+    long totalMemory = runtime.totalMemory();
+    long usedMemory = totalMemory - runtime.freeMemory();
+    return String.format(
+        "used/total/max = %d/%d/%d MB, GC last/max = %.2f/%.2f %%, #pushbacks=%d, gc thrashing=%s",
+        usedMemory >> 20,
+        totalMemory >> 20,
+        maxMemory >> 20,
+        lastMeasuredGCPercentage.get(),
+        maxGCPercentage.get(),
+        numPushbacks.get(),
+        isThrashing.get());
+  }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java
new file mode 100644
index 0000000..c3ec179
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Worker status client. */
+package org.apache.beam.fn.harness.status;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 17f37d7..97469c4 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -227,7 +227,7 @@ public class ProcessBundleHandlerTest {
     }
 
     @Override
-    ExecutionStateTracker getStateTracker() {
+    public ExecutionStateTracker getStateTracker() {
       return wrappedBundleProcessor.getStateTracker();
     }
 
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
new file mode 100644
index 0000000..7905a2c
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.stringContainsInOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessorCache;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.WorkerStatusResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc.BeamFnWorkerStatusImplBase;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BeamFnStatusClientTest {
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+      Endpoints.ApiServiceDescriptor.newBuilder()
+          .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+          .build();
+
+  @Test
+  public void testActiveBundleState() {
+    ProcessBundleHandler handler = mock(ProcessBundleHandler.class);
+    BundleProcessorCache processorCache = mock(BundleProcessorCache.class);
+    Map<String, BundleProcessor> bundleProcessorMap = new HashMap<>();
+    for (int i = 0; i < 11; i++) {
+      BundleProcessor processor = mock(BundleProcessor.class);
+      ExecutionStateTracker executionStateTracker = mock(ExecutionStateTracker.class);
+      when(processor.getStateTracker()).thenReturn(executionStateTracker);
+      when(executionStateTracker.getMillisSinceLastTransition())
+          .thenReturn(Integer.toUnsignedLong((10 - i) * 1000));
+      when(executionStateTracker.getTrackedThread()).thenReturn(Thread.currentThread());
+      String instruction = Integer.toString(i);
+      when(processorCache.find(instruction)).thenReturn(processor);
+      bundleProcessorMap.put(instruction, processor);
+    }
+    when(handler.getBundleProcessorCache()).thenReturn(processorCache);
+    when(processorCache.getActiveBundleProcessors()).thenReturn(bundleProcessorMap);
+
+    ManagedChannelFactory channelFactory = InProcessManagedChannelFactory.create();
+    BeamFnStatusClient client =
+        new BeamFnStatusClient(
+            apiServiceDescriptor,
+            channelFactory::forDescriptor,
+            handler.getBundleProcessorCache(),
+            PipelineOptionsFactory.create());
+    StringJoiner joiner = new StringJoiner("\n");
+    joiner.add(client.getActiveProcessBundleState());
+    String actualState = joiner.toString();
+
+    List<String> expectedInstructions = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      expectedInstructions.add(String.format("Instruction %d", i));
+    }
+    assertThat(actualState, stringContainsInOrder(expectedInstructions));
+    assertThat(actualState, not(containsString("Instruction 10")));
+  }
+
+  @Test
+  public void testWorkerStatusResponse() throws Exception {
+    BlockingQueue<WorkerStatusResponse> values = new LinkedBlockingQueue<>();
+    BlockingQueue<StreamObserver<WorkerStatusRequest>> requestObservers =
+        new LinkedBlockingQueue<>();
+    StreamObserver<WorkerStatusResponse> inboundServerObserver =
+        TestStreams.withOnNext(values::add).build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnWorkerStatusImplBase() {
+                  @Override
+                  public StreamObserver<WorkerStatusResponse> workerStatus(
+                      StreamObserver<WorkerStatusRequest> responseObserver) {
+                    Uninterruptibles.putUninterruptibly(requestObservers, responseObserver);
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+
+    try {
+      BundleProcessorCache processorCache = mock(BundleProcessorCache.class);
+      when(processorCache.getActiveBundleProcessors()).thenReturn(Collections.emptyMap());
+      ManagedChannelFactory channelFactory = InProcessManagedChannelFactory.create();
+      BeamFnStatusClient client =
+          new BeamFnStatusClient(
+              apiServiceDescriptor,
+              channelFactory::forDescriptor,
+              processorCache,
+              PipelineOptionsFactory.create());
+      StreamObserver<WorkerStatusRequest> requestObserver = requestObservers.take();
+      requestObserver.onNext(WorkerStatusRequest.newBuilder().setId("id").build());
+      WorkerStatusResponse response = values.take();
+      assertThat(response.getStatusInfo(), containsString("No active processing bundles."));
+      assertThat(response.getId(), is("id"));
+    } finally {
+      server.shutdownNow();
+    }
+  }
+}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java
new file mode 100644
index 0000000..2fa407c
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/MemoryMonitorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.status;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test the memory monitor will block threads when the server is in a (faked) GC thrashing state.
+ */
+@RunWith(JUnit4.class)
+public class MemoryMonitorTest {
+
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  static class FakeGCStatsProvider implements MemoryMonitor.GCStatsProvider {
+    AtomicBoolean inGCThrashingState = new AtomicBoolean(false);
+    long lastCallTimestamp = System.currentTimeMillis();
+    long lastGCResult = 0;
+
+    @Override
+    public long totalGCTimeMilliseconds() {
+      if (inGCThrashingState.get()) {
+        long now = System.currentTimeMillis();
+        lastGCResult += now - lastCallTimestamp;
+        lastCallTimestamp = now;
+      }
+      return lastGCResult;
+    }
+  }
+
+  private FakeGCStatsProvider provider;
+  private File localDumpFolder;
+  private MemoryMonitor monitor;
+  private Thread thread;
+
+  @Before
+  public void setup() throws IOException {
+    provider = new FakeGCStatsProvider();
+    localDumpFolder = tempFolder.newFolder();
+    // Update every 10ms, never shutdown VM.
+    monitor = MemoryMonitor.forTest(provider, 10, 0, false, 50.0, null, localDumpFolder);
+    thread = new Thread(monitor);
+    thread.start();
+  }
+
+  @Test(timeout = 1000)
+  public void detectGCThrashing() throws InterruptedException {
+    monitor.waitForRunning();
+    monitor.waitForResources("Test1");
+    provider.inGCThrashingState.set(true);
+    monitor.waitForThrashingState(true);
+    final Semaphore s = new Semaphore(0);
+    new Thread(
+            () -> {
+              monitor.waitForResources("Test2");
+              s.release();
+            })
+        .start();
+    assertFalse(s.tryAcquire(100, TimeUnit.MILLISECONDS));
+    provider.inGCThrashingState.set(false);
+    monitor.waitForThrashingState(false);
+    assertTrue(s.tryAcquire(100, TimeUnit.MILLISECONDS));
+    monitor.waitForResources("Test3");
+  }
+
+  @Test
+  public void heapDumpOnce() throws Exception {
+    File folder = tempFolder.newFolder();
+
+    File dump1 = MemoryMonitor.dumpHeap(folder);
+    assertNotNull(dump1);
+    assertTrue(dump1.exists());
+    assertThat(dump1.getParentFile(), Matchers.equalTo(folder));
+  }
+
+  @Test
+  public void heapDumpTwice() throws Exception {
+    File folder = tempFolder.newFolder();
+
+    File dump1 = MemoryMonitor.dumpHeap(folder);
+    assertNotNull(dump1);
+    assertTrue(dump1.exists());
+    assertThat(dump1.getParentFile(), Matchers.equalTo(folder));
+
+    File dump2 = MemoryMonitor.dumpHeap(folder);
+    assertNotNull(dump2);
+    assertTrue(dump2.exists());
+    assertThat(dump2.getParentFile(), Matchers.equalTo(folder));
+  }
+
+  @Test
+  public void uploadFile() throws Exception {
+    File remoteFolder = tempFolder.newFolder();
+    monitor =
+        MemoryMonitor.forTest(provider, 10, 0, true, 50.0, remoteFolder.getPath(), localDumpFolder);
+
+    // Force the monitor to generate a local heap dump
+    monitor.dumpHeap();
+
+    // Try to upload the heap dump
+    assertTrue(monitor.tryUploadHeapDumpIfItExists());
+
+    File[] files = remoteFolder.listFiles();
+    assertThat(files, Matchers.arrayWithSize(1));
+    assertThat(files[0].getAbsolutePath(), Matchers.containsString("heap_dump"));
+    assertThat(files[0].getAbsolutePath(), Matchers.containsString("hprof"));
+  }
+
+  @Test
+  public void uploadFileDisabled() throws Exception {
+    monitor = MemoryMonitor.forTest(provider, 10, 0, true, 50.0, null, localDumpFolder);
+
+    // Force the monitor to generate a local heap dump
+    monitor.dumpHeap();
+
+    // Try to upload the heap dump
+    assertFalse(monitor.tryUploadHeapDumpIfItExists());
+  }
+
+  @Test
+  public void disableMemoryMonitor() throws Exception {
+    MemoryMonitor disabledMonitor =
+        MemoryMonitor.forTest(provider, 10, 0, true, 100.0, null, localDumpFolder);
+    Thread disabledMonitorThread = new Thread(disabledMonitor);
+    disabledMonitorThread.start();
+
+    // Monitor thread should stop quickly after starting. Wait 10 seconds, and check that monitor
+    // thread is not alive.
+    disabledMonitorThread.join(10000);
+    assertFalse(disabledMonitorThread.isAlive());
+
+    // Enabled monitor thread should still be running.
+    assertTrue(thread.isAlive());
+  }
+}