You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2019/01/29 18:42:36 UTC

[beam] branch master updated: [BEAM-6431] Move ExecutionStateSampler and ExecutionStateTracker into runners-core-java

This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2a73f  [BEAM-6431] Move ExecutionStateSampler and ExecutionStateTracker into runners-core-java
     new 52e7328  Merge pull request #7634: [BEAM-6431] Move state sampler files to runners-core so they can be used in the Java SDK as well
ca2a73f is described below

commit ca2a73fd99d713f96e0344fcc56d56e76168d42e
Author: Alex Amato <aj...@google.com>
AuthorDate: Fri Jan 25 18:41:45 2019 -0800

    [BEAM-6431] Move ExecutionStateSampler and ExecutionStateTracker into runners-core-java
---
 .../core/metrics}/ExecutionStateSampler.java       |  9 +++++----
 .../core/metrics}/ExecutionStateTracker.java       | 23 ++++++++++++++++++----
 .../core/metrics}/ExecutionStateSamplerTest.java   |  6 +++---
 .../dataflow/worker/BatchDataflowWorker.java       |  2 +-
 .../dataflow/worker/BatchModeExecutionContext.java |  2 +-
 .../worker/ChunkingShuffleBatchReader.java         |  4 ++--
 .../dataflow/worker/ContextActivationObserver.java |  2 +-
 .../worker/DataflowElementExecutionTracker.java    |  2 +-
 .../dataflow/worker/DataflowExecutionContext.java  |  4 ++--
 .../worker/DataflowExecutionStateRegistry.java     |  2 +-
 .../dataflow/worker/DataflowMapTaskExecutor.java   |  2 +-
 .../dataflow/worker/DataflowMetricsContainer.java  |  2 +-
 .../dataflow/worker/DataflowOperationContext.java  |  4 ++--
 .../dataflow/worker/GroupingShuffleReader.java     |  4 ++--
 .../dataflow/worker/IntrinsicMapTaskExecutor.java  |  2 +-
 ...nmentContextActivationObserverRegistration.java |  2 +-
 .../beam/runners/dataflow/worker/ShuffleSink.java  |  4 ++--
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 ++--
 .../worker/StreamingModeExecutionContext.java      |  4 ++--
 .../dataflow/worker/WorkItemStatusClient.java      |  4 ++--
 .../WorkerCustomSourceOperationExecutor.java       |  4 ++--
 .../worker/fn/control/BeamFnMapTaskExecutor.java   |  2 +-
 .../logging/DataflowWorkerLoggingHandler.java      |  4 ++--
 .../worker/util/common/worker/MapTaskExecutor.java |  1 +
 .../util/common/worker/ShuffleReadCounter.java     |  1 +
 .../worker/BatchModeExecutionContextTest.java      |  6 +++---
 .../ContextActivationObserverRegistryTest.java     |  2 +-
 .../worker/DataflowExecutionContextTest.java       |  2 +-
 .../worker/DataflowExecutionStateTrackerTest.java  |  4 ++--
 .../worker/DataflowOperationContextTest.java       | 10 +++++-----
 .../worker/DataflowSideInputReadCounterTest.java   |  2 +-
 .../dataflow/worker/GroupingShuffleReaderTest.java |  4 ++--
 .../worker/IntrinsicMapTaskExecutorTest.java       |  4 ++--
 .../runners/dataflow/worker/SimpleParDoFnTest.java |  2 +-
 .../worker/StreamingModeExecutionContextTest.java  |  6 +++---
 .../dataflow/worker/TestOperationContext.java      |  4 ++--
 .../dataflow/worker/WorkItemStatusClientTest.java  |  4 ++--
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  2 +-
 .../fn/control/BeamFnMapTaskExecutorTest.java      |  2 +-
 .../logging/DataflowWorkerLoggingHandlerTest.java  |  2 +-
 .../worker/GroupingShuffleEntryIteratorTest.java   |  2 ++
 .../util/common/worker/MapTaskExecutorTest.java    |  2 ++
 42 files changed, 91 insertions(+), 69 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
similarity index 96%
rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java
rename to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
index d2b437c..c3fb816 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSampler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.util.common.worker;
+package org.apache.beam.runners.core.metrics;
 
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.joda.time.DateTimeUtils.MillisProvider;
@@ -66,7 +67,7 @@ public class ExecutionStateSampler {
 
   private static final long PERIOD_MS = 200;
 
-  private Future<Void> executionSamplerFuture = null;
+  @Nullable private Future<Void> executionSamplerFuture = null;
 
   /**
    * Called to start the ExecutionStateSampler. Until the returned {@link Closeable} is closed, the
@@ -141,7 +142,7 @@ public class ExecutionStateSampler {
   }
 
   /**
-   * Deregister tracker after MapTask completes
+   * Deregister tracker after MapTask completes.
    *
    * <p>This method needs to be synchronized to prevent race condition with sampling thread
    */
@@ -161,7 +162,7 @@ public class ExecutionStateSampler {
   }
 
   /**
-   * Attributing sampling time to trackers
+   * Attributing sampling time to trackers.
    *
    * <p>This method needs to be synchronized to prevent race condition with removing tracker
    */
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
similarity index 94%
rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java
rename to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index 57795a0..d9fc3dd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.util.common.worker;
+package org.apache.beam.runners.core.metrics;
 
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.Closeable;
 import java.util.Map;
 import java.util.Objects;
@@ -29,6 +30,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
 
 /** Tracks the current state of a single execution thread. */
+@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Intentional for performance.")
 public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> {
 
   /**
@@ -102,7 +104,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
   private final ExecutionStateSampler sampler;
 
   /** The thread being managed by this {@link ExecutionStateTracker}. */
-  private Thread trackedThread = null;
+  @Nullable private Thread trackedThread = null;
 
   /**
    * The current state of the thread managed by this {@link ExecutionStateTracker}.
@@ -110,7 +112,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
    * <p>This variable is written by the Execution thread, and read by the sampling and progress
    * reporting threads, thus it being marked volatile.
    */
-  private volatile ExecutionState currentState;
+  @Nullable private volatile ExecutionState currentState;
 
   /**
    * The current number of times that this {@link ExecutionStateTracker} has transitioned state.
@@ -141,8 +143,18 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
   }
 
   @Override
+  public boolean equals(Object o) {
+    return this == o;
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  @Override
   public int compareTo(ExecutionStateTracker o) {
-    if (this == o) {
+    if (this.equals(o)) {
       return 0;
     } else {
       return System.identityHashCode(this) - System.identityHashCode(o);
@@ -215,6 +227,9 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
    * from the execution thread.
    */
   @SuppressWarnings("NonAtomicVolatileUpdate")
+  @SuppressFBWarnings(
+      value = "VO_VOLATILE_INCREMENT",
+      justification = "Intentional for performance.")
   public Closeable enterState(ExecutionState newState) {
     // WARNING: This method is called in the hottest path, and must be kept as efficient as
     // possible. Avoid blocking, synchronizing, etc.
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java
similarity index 95%
rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java
rename to runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java
index b57cc46..073cfd1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ExecutionStateSamplerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ExecutionStateSamplerTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.util.common.worker;
+package org.apache.beam.runners.core.metrics;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.joda.time.DateTimeUtils.MillisProvider;
 import org.junit.Before;
 import org.junit.Test;
 
-/** Tests for {@link ExecutionStateSampler}. */
+/** Tests for {@link org.apache.beam.runners.core.metrics.ExecutionStateSampler}. */
 public class ExecutionStateSamplerTest {
 
   private MillisProvider clock;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 9300dfd..792dcf9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -26,6 +26,7 @@ import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
@@ -47,7 +48,6 @@ import org.apache.beam.runners.dataflow.worker.graph.ReplacePgbkWithPrecombineFu
 import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.util.Weighted;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 6d42a10..7933002 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -36,7 +37,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
index 840b585..ceb39f0 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java
@@ -23,9 +23,9 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleBatchReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ShufflePosition;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java
index e25ed95..10a6bd4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserver.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
 import com.google.auto.service.AutoService;
 import java.io.Closeable;
 import java.util.ServiceLoader;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.sdk.annotations.Experimental;
 
 /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java
index 2db5ef7..70016bc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowElementExecutionTracker.java
@@ -29,12 +29,12 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 621cdb5..2ce168ce 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -31,13 +31,13 @@ import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java
index e973664..133e2ad 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateRegistry.java
@@ -23,9 +23,9 @@ import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java
index c811842..96db026 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutor.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import java.util.List;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
index bd9599b..745f9f6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Gauge;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
index d57c047..23e302e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContext.java
@@ -28,13 +28,13 @@ import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java
index e840bfb..d4a2551 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReader.java
@@ -31,13 +31,13 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java
index c0caa53..7aa00a1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutor.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.dataflow.worker;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
 
 /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java
index 1e600f9..b9499af 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsEnvironmentContextActivationObserverRegistration.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
 import com.google.auto.service.AutoService;
 import java.io.Closeable;
 import java.io.IOException;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java
index 11f6549..d2f2e3c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ShuffleSink.java
@@ -20,12 +20,12 @@ package org.apache.beam.runners.dataflow.worker;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index abe792f..fb0e8bb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -62,6 +62,8 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -103,8 +105,6 @@ import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 78555ca..ea5dbe7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -38,12 +38,12 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
index 2d124ac..c651b73 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
@@ -37,13 +37,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
index ad5005b..4f87291 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourceOperationExecutor.java
@@ -23,9 +23,9 @@ import com.google.api.services.dataflow.model.SourceOperationResponse;
 import com.google.api.services.dataflow.model.SourceSplitRequest;
 import java.io.Closeable;
 import java.util.Collections;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index cad9fca..98ec418 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.GaugeData;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
@@ -55,7 +56,6 @@ import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 82b329d..7428390 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -40,10 +40,10 @@ import java.util.logging.Handler;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.io.CountingOutputStream;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
index c729fab..07d23ef 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutor.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.ListIterator;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java
index fcd0670..3ee2267 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleReadCounter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
index 0e4c7dd..6d58f4d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
@@ -30,13 +30,13 @@ import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.DistributionUpdate;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.MetricName;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java
index 4c98f2e..64c30f4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ContextActivationObserverRegistryTest.java
@@ -25,7 +25,7 @@ import com.google.auto.service.AutoService;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
index 8bc142e..7008dd4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.auto.service.AutoService;
 import java.io.Closeable;
 import java.io.IOException;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
index 598e994..f1f8aa3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateTrackerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
 
 import java.io.Closeable;
 import java.io.IOException;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState;
@@ -34,8 +36,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
index 78c365f..014d3cb 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowOperationContextTest.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ABORT_STATE_NAME;
-import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.FINISH_STATE_NAME;
-import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.PROCESS_STATE_NAME;
-import static org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.START_STATE_NAME;
+import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.ABORT_STATE_NAME;
+import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.FINISH_STATE_NAME;
+import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.PROCESS_STATE_NAME;
+import static org.apache.beam.runners.core.metrics.ExecutionStateTracker.START_STATE_NAME;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertThat;
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionStateRegistry;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
@@ -43,7 +44,6 @@ import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java
index 577a576..4197648 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounterTest.java
@@ -26,11 +26,11 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java
index 35ea0fa..ff53cb4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/GroupingShuffleReaderTest.java
@@ -46,6 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
@@ -58,8 +60,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutorTestUtils;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
index 1cf9815..67d3670 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorTest.java
@@ -42,6 +42,8 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
@@ -53,8 +55,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutorTestUtils.TestReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
index 2f58679..ec3d9dd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
@@ -38,13 +38,13 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.sdk.options.PipelineOptions;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 0486b92..6ebabc4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -43,6 +43,9 @@ import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry;
@@ -50,9 +53,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.metrics.MetricsContainer;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java
index e64b6a0..a58f234 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestOperationContext.java
@@ -19,13 +19,13 @@ package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java
index 6353667..9fcdb14 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClientTest.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.metrics.CounterCell;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.DataflowReaderPosition;
@@ -56,8 +58,6 @@ import org.apache.beam.runners.dataflow.worker.WorkerCustomSources.BoundedSource
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 0fcd92b..207b6d9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -67,6 +67,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -79,7 +80,6 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.Pipeline;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index af1fce2..f38d31f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -40,11 +40,11 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
 import org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
index 51e494a..e48291b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
@@ -30,11 +30,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
 import org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Timestamp;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
 import org.junit.After;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java
index f4ed032..c193733 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/GroupingShuffleEntryIteratorTest.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext;
 import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
index f1db84d..ad0b862 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/MapTaskExecutorTest.java
@@ -42,6 +42,8 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowElementExecutionTracker;