You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/10/12 16:31:01 UTC

[beam] branch master updated: Migrate GcsOptions#getExecutorService to an unbounded ScheduledExecutorService (#23545)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb512ee289 Migrate GcsOptions#getExecutorService to an unbounded ScheduledExecutorService (#23545)
3cb512ee289 is described below

commit 3cb512ee2893b5710e0ebebaa0e739eb086f7e78
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Oct 12 09:30:53 2022 -0700

    Migrate GcsOptions#getExecutorService to an unbounded ScheduledExecutorService (#23545)
    
    This is a preliminary piece for issue #21368
---
 .../runners/dataflow/DataflowPipelineJobTest.java  |  21 +-
 .../runners/dataflow/util/PackageUtilTest.java     |   8 +-
 .../dataflow/worker/BatchDataflowWorkerTest.java   |   2 +-
 .../worker/DataflowBatchWorkerHarnessTest.java     |   2 +-
 .../worker/DataflowWorkUnitClientTest.java         |   2 +-
 .../java/org/apache/beam/sdk/util/NanoClock.java}  |  40 +-
 .../util/UnboundedScheduledExecutorService.java    | 504 ++++++++++++++++++++
 .../beam/sdk}/util/FastNanoClockAndSleeper.java    |  13 +-
 .../sdk}/util/FastNanoClockAndSleeperTest.java     |   2 +-
 .../UnboundedScheduledExecutorServiceTest.java     | 505 +++++++++++++++++++++
 .../sdk/extensions/gcp/options/GcsOptions.java     |  18 +-
 .../sdk/extensions/gcp/GcpCoreApiSurfaceTest.java  |   3 +-
 .../beam/sdk/extensions/gcp/util/GcsUtilTest.java  |  23 +-
 .../gcp/util/RetryHttpRequestInitializerTest.java  |   6 +-
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  |   8 +-
 15 files changed, 1076 insertions(+), 81 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index d45b3c6ebcd..b8bc2722809 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -48,9 +48,9 @@ import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Duration;
@@ -149,7 +149,8 @@ public class DataflowPipelineJobTest {
         new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
 
     State state =
-        job.waitUntilFinish(Duration.standardMinutes(5), jobHandler, fastClock, fastClock);
+        job.waitUntilFinish(
+            Duration.standardMinutes(5), jobHandler, fastClock::sleep, fastClock::nanoTime);
     assertEquals(null, state);
   }
 
@@ -169,7 +170,8 @@ public class DataflowPipelineJobTest {
     DataflowPipelineJob job =
         new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
 
-    return job.waitUntilFinish(Duration.standardMinutes(1), null, fastClock, fastClock);
+    return job.waitUntilFinish(
+        Duration.standardMinutes(1), null, fastClock::sleep, fastClock::nanoTime);
   }
 
   /**
@@ -251,7 +253,9 @@ public class DataflowPipelineJobTest {
         new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
 
     long startTime = fastClock.nanoTime();
-    State state = job.waitUntilFinish(Duration.standardMinutes(5), null, fastClock, fastClock);
+    State state =
+        job.waitUntilFinish(
+            Duration.standardMinutes(5), null, fastClock::sleep, fastClock::nanoTime);
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(
@@ -271,7 +275,8 @@ public class DataflowPipelineJobTest {
     DataflowPipelineJob job =
         new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, ImmutableMap.of());
     long startTime = fastClock.nanoTime();
-    State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock);
+    State state =
+        job.waitUntilFinish(Duration.millis(4), null, fastClock::sleep, fastClock::nanoTime);
     assertEquals(null, state);
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     // Should only have slept for the 4 ms allowed.
@@ -318,7 +323,7 @@ public class DataflowPipelineJobTest {
         State.RUNNING,
         job.getStateWithRetriesOrUnknownOnException(
             BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
-            fastClock));
+            fastClock::sleep));
   }
 
   @Test
@@ -335,7 +340,7 @@ public class DataflowPipelineJobTest {
     thrown.expect(IOException.class);
     job.getStateWithRetries(
         BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
-        fastClock);
+        fastClock::sleep);
   }
 
   @Test
@@ -354,7 +359,7 @@ public class DataflowPipelineJobTest {
         State.UNKNOWN,
         job.getStateWithRetriesOrUnknownOnException(
             BackOffAdapter.toGcpBackOff(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff()),
-            fastClock));
+            fastClock::sleep));
     long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime);
     checkValidInterval(
         DataflowPipelineJob.STATUS_POLLING_INTERVAL,
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index ca5dd511219..144d6cec7c5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -68,7 +68,6 @@ import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
 import org.apache.beam.runners.dataflow.util.PackageUtil.StagedFile;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
 import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -79,6 +78,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RegexMatcher;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.ZipFiles;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -413,7 +413,7 @@ public class PackageUtilTest {
       directPackageUtil.stageClasspathElements(
           ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
           STAGING_PATH,
-          fastNanoClockAndSleeper,
+          fastNanoClockAndSleeper::sleep,
           createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
@@ -441,7 +441,7 @@ public class PackageUtilTest {
       directPackageUtil.stageClasspathElements(
           ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
           STAGING_PATH,
-          fastNanoClockAndSleeper,
+          fastNanoClockAndSleeper::sleep,
           createOptions);
       fail("Expected RuntimeException");
     } catch (RuntimeException e) {
@@ -484,7 +484,7 @@ public class PackageUtilTest {
       directPackageUtil.stageClasspathElements(
           ImmutableList.of(makeStagedFile(tmpFile.getAbsolutePath())),
           STAGING_PATH,
-          fastNanoClockAndSleeper,
+          fastNanoClockAndSleeper::sleep,
           createOptions);
     } finally {
       verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
index 6b55d313ddf..d9ccda85733 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java
@@ -39,8 +39,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.hamcrest.Description;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
index 34517435ad0..31052fb49be 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarnessTest.java
@@ -30,10 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index b96ee2fb84c..a7b53956e65 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -38,10 +38,10 @@ import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
 import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java
similarity index 51%
copy from sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java
index dd6f92cb988..1f58f25b931 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NanoClock.java
@@ -15,33 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.gcp.util;
-
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TestRule;
+package org.apache.beam.sdk.util;
 
 /**
- * This object quickly moves time forward based upon how much it has been asked to sleep, without
- * actually sleeping, to simulate the backoff.
+ * Nano clock which can be used to measure elapsed time in nanoseconds.
+ *
+ * <p>The default system implementation can be accessed at {@link #SYSTEM}. Alternative
+ * implementations may be used for testing.
  */
-public class FastNanoClockAndSleeper extends ExternalResource
-    implements NanoClock, Sleeper, TestRule {
-  private long fastNanoTime;
-
-  @Override
-  public long nanoTime() {
-    return fastNanoTime;
-  }
+@FunctionalInterface
+interface NanoClock {
 
-  @Override
-  protected void before() throws Throwable {
-    fastNanoTime = SYSTEM.nanoTime();
-  }
+  /**
+   * Returns the current value of the most precise available system timer, in nanoseconds for use to
+   * measure elapsed time, to match the behavior of {@link System#nanoTime()}.
+   */
+  long nanoTime();
 
-  @Override
-  public void sleep(long millis) throws InterruptedException {
-    fastNanoTime += millis * 1000000L;
-  }
+  /**
+   * Provides the default System implementation of a nano clock by using {@link System#nanoTime()}.
+   */
+  NanoClock SYSTEM = System::nanoTime;
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
new file mode 100644
index 00000000000..57a1a829f10
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorService.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.KeyForBottom;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An unbounded {@link ScheduledExecutorService} based upon the {@link ScheduledThreadPoolExecutor}
+ * API contract.
+ *
+ * <p>Note that this implementation differs from a {@link ScheduledThreadPoolExecutor} in the
+ * following ways:
+ *
+ * <ul>
+ *   <li>The core pool size is always 0.
+ *   <li>Any work that is immediately executable is given to a thread before returning from the
+ *       corresponding {@code execute}, {@code submit}, {@code schedule*} methods.
+ *   <li>An unbounded number of threads can be started.
+ * </ul>
+ */
+public final class UnboundedScheduledExecutorService implements ScheduledExecutorService {
+
+  /**
+   * A {@link FutureTask} that handles periodically rescheduling tasks.
+   *
+   * <p>Note that it is important that this class extends {@link FutureTask} and {@link
+   * RunnableScheduledFuture} to be compatible with the types of objects returned by a {@link
+   * ScheduledThreadPoolExecutor}.
+   */
+  @VisibleForTesting
+  @SuppressFBWarnings(
+      value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification =
+          "Default equals/hashCode is what we want since two scheduled tasks are only equivalent if they point to the same instance.")
+  final class ScheduledFutureTask<@Nullable @KeyForBottom V> extends FutureTask<V>
+      implements RunnableScheduledFuture<V> {
+
+    /** Sequence number to break ties FIFO. */
+    private final long sequenceNumber;
+
+    /** The time the task is enabled to execute in nanoTime units. */
+    private long time;
+
+    /**
+     * Period in nanoseconds for repeating tasks. A positive value indicates fixed-rate execution. A
+     * negative value indicates fixed-delay execution. A value of 0 indicates a non-repeating
+     * (one-shot) task.
+     */
+    private final long period;
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime) {
+      this(r, result, triggerTime, 0);
+    }
+
+    /** Creates a periodic action with given nanoTime-based initial trigger time and period. */
+    @SuppressWarnings("argument.type.incompatible")
+    ScheduledFutureTask(Runnable r, @Nullable V result, long triggerTime, long period) {
+      super(r, result);
+      this.time = triggerTime;
+      this.period = period;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    /** Creates a one-shot action with given nanoTime-based trigger time. */
+    ScheduledFutureTask(Callable<V> callable, long triggerTime) {
+      super(callable);
+      this.time = triggerTime;
+      this.period = 0;
+      this.sequenceNumber = sequencer.getAndIncrement();
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(LongMath.saturatedSubtract(time, clock.nanoTime()), NANOSECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+      if (other == this) // compare zero if same object
+      {
+        return 0;
+      }
+      if (other instanceof ScheduledFutureTask) {
+        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+        int diff = Longs.compare(time, x.time);
+        if (diff != 0) {
+          return diff;
+        }
+        if (sequenceNumber < x.sequenceNumber) {
+          return -1;
+        }
+        return 1;
+      }
+      long diff = LongMath.saturatedSubtract(getDelay(NANOSECONDS), other.getDelay(NANOSECONDS));
+      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return period != 0;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean cancelled = super.cancel(mayInterruptIfRunning);
+      synchronized (tasks) {
+        tasks.remove(this);
+      }
+      return cancelled;
+    }
+
+    /** Overrides {@link FutureTask} so as to reset/requeue if periodic. */
+    @Override
+    public void run() {
+      boolean periodic = isPeriodic();
+      if (!periodic) {
+        super.run();
+      } else if (super.runAndReset()) {
+        // Set the next runtime
+        if (period > 0) {
+          time = LongMath.saturatedAdd(time, period);
+        } else {
+          time = triggerTime(-period);
+        }
+        synchronized (tasks) {
+          tasks.add(this);
+          tasks.notify();
+        }
+      }
+    }
+  }
+
+  // Used to break ties in ordering of future tasks that are scheduled for the same time
+  // so that they have a consistent ordering based upon their insertion order into
+  // this ScheduledExecutorService.
+  private final AtomicLong sequencer = new AtomicLong();
+
+  private final NanoClock clock;
+  private final ThreadPoolExecutor threadPoolExecutor;
+  @VisibleForTesting final PriorityQueue<ScheduledFutureTask<?>> tasks;
+  private final AbstractExecutorService invokeMethodsAdapter;
+  private final Future<?> launchTasks;
+
+  public UnboundedScheduledExecutorService() {
+    this(NanoClock.SYSTEM);
+  }
+
+  @VisibleForTesting
+  UnboundedScheduledExecutorService(NanoClock clock) {
+    this.clock = clock;
+    ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
+    threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
+    threadFactoryBuilder.setDaemon(true);
+
+    this.threadPoolExecutor =
+        new ThreadPoolExecutor(
+            0,
+            Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
+            Long.MAX_VALUE,
+            TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
+            new SynchronousQueue<>(),
+            threadFactoryBuilder.build());
+
+    // Create an internal adapter so that execute does not re-wrap the ScheduledFutureTask again
+    this.invokeMethodsAdapter =
+        new AbstractExecutorService() {
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+            return new ScheduledFutureTask<>(runnable, value, 0);
+          }
+
+          @Override
+          protected <@KeyForBottom T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+            return new ScheduledFutureTask<>(callable, 0);
+          }
+
+          @Override
+          public void shutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public List<Runnable> shutdownNow() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          /* UnboundedScheduledExecutorService is the only caller after it has been initialized.*/
+          @SuppressWarnings("method.invocation.invalid")
+          public void execute(Runnable command) {
+            // These are already guaranteed to be a ScheduledFutureTask so there is no need to wrap
+            // it in another ScheduledFutureTask.
+            threadPoolExecutor.execute(command);
+          }
+        };
+    this.tasks = new PriorityQueue<>();
+    this.launchTasks =
+        threadPoolExecutor.submit(new TaskLauncher(tasks, threadPoolExecutor, clock));
+  }
+
+  private static class TaskLauncher implements Callable<Void> {
+    private final PriorityQueue<ScheduledFutureTask<?>> tasks;
+    private final ThreadPoolExecutor threadPoolExecutor;
+    private final NanoClock clock;
+
+    private TaskLauncher(
+        PriorityQueue<ScheduledFutureTask<?>> tasks,
+        ThreadPoolExecutor threadPoolExecutor,
+        NanoClock clock) {
+      this.tasks = tasks;
+      this.threadPoolExecutor = threadPoolExecutor;
+      this.clock = clock;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (true) {
+        synchronized (tasks) {
+          if (threadPoolExecutor.isShutdown()) {
+            return null;
+          }
+          ScheduledFutureTask<?> task = tasks.peek();
+          if (task == null) {
+            tasks.wait();
+            continue;
+          }
+          long nanosToWait = LongMath.saturatedSubtract(task.time, clock.nanoTime());
+          if (nanosToWait > 0) {
+            long millisToWait = nanosToWait / 1_000_000;
+            int nanosRemainder = (int) (nanosToWait % 1_000_000);
+            tasks.wait(millisToWait, nanosRemainder);
+            continue;
+          }
+          // Remove the task from the queue since it is ready to be scheduled now
+          task = tasks.remove();
+          threadPoolExecutor.execute(task);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    threadPoolExecutor.shutdown();
+    synchronized (tasks) {
+      // Notify tasks which checks to see if the ThreadPoolExecutor is shutdown and exits cleanly.
+      tasks.notify();
+    }
+
+    // Re-throw any errors during shutdown of the launchTasks thread.
+    try {
+      launchTasks.get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    shutdown();
+    synchronized (tasks) {
+      List<Runnable> rval = new ArrayList<>(tasks);
+      tasks.clear();
+      rval.addAll(threadPoolExecutor.shutdownNow());
+      return rval;
+    }
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return threadPoolExecutor.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return threadPoolExecutor.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return threadPoolExecutor.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    if (command == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null, triggerTime(0));
+    threadPoolExecutor.execute(task);
+  }
+
+  @Override
+  public Future<@Nullable ?> submit(Runnable command) {
+    if (command == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null, triggerTime(0));
+    threadPoolExecutor.execute(task);
+    return task;
+  }
+
+  @Override
+  /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+  @SuppressWarnings("override.return.invalid")
+  public <@Nullable @KeyForBottom T> Future<T> submit(Runnable command, T result) {
+    if (command == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<T> task = new ScheduledFutureTask<>(command, result, triggerTime(0));
+    runNowOrScheduleInTheFuture(task);
+    return task;
+  }
+
+  @Override
+  /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+  @SuppressWarnings({"override.param.invalid", "override.return.invalid"})
+  public <@Nullable @KeyForBottom T> Future<T> submit(Callable<T> command) {
+    if (command == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<T> task = new ScheduledFutureTask<>(command, triggerTime(0));
+    threadPoolExecutor.execute(task);
+    return task;
+  }
+
+  @Override
+  public <@KeyForBottom T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    return invokeMethodsAdapter.invokeAll(tasks);
+  }
+
+  @Override
+  public <@KeyForBottom T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    if (tasks == null || unit == null) {
+      throw new NullPointerException();
+    }
+    return invokeMethodsAdapter.invokeAll(tasks, timeout, unit);
+  }
+
+  @Override
+  public <@KeyForBottom T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    return invokeMethodsAdapter.invokeAny(tasks);
+  }
+
+  @Override
+  public <@KeyForBottom T> T invokeAny(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return invokeMethodsAdapter.invokeAny(tasks, timeout, unit);
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    if (command == null || unit == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<Void> task =
+        new ScheduledFutureTask<>(command, null, triggerTime(delay, unit));
+    runNowOrScheduleInTheFuture(task);
+    return task;
+  }
+
+  @Override
+  /* Ignore improper flag since FB detects that ScheduledExecutorService can't have nullable V. */
+  @SuppressWarnings({"override.param.invalid", "override.return.invalid"})
+  public <@Nullable @KeyForBottom V> ScheduledFuture<V> schedule(
+      Callable<V> callable, long delay, TimeUnit unit) {
+    if (callable == null || unit == null) {
+      throw new NullPointerException();
+    }
+    ScheduledFutureTask<V> task = new ScheduledFutureTask<>(callable, triggerTime(delay, unit));
+    runNowOrScheduleInTheFuture(task);
+    return task;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command, long initialDelay, long period, TimeUnit unit) {
+    if (command == null || unit == null) {
+      throw new NullPointerException();
+    }
+    if (period <= 0) {
+      throw new IllegalArgumentException();
+    }
+    ScheduledFutureTask<Void> task =
+        new ScheduledFutureTask<Void>(
+            command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
+    runNowOrScheduleInTheFuture(task);
+    return task;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command, long initialDelay, long delay, TimeUnit unit) {
+    if (command == null || unit == null) {
+      throw new NullPointerException();
+    }
+    if (delay <= 0) {
+      throw new IllegalArgumentException();
+    }
+    ScheduledFutureTask<Void> task =
+        new ScheduledFutureTask<>(
+            command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay));
+    runNowOrScheduleInTheFuture(task);
+    return task;
+  }
+
+  private <@Nullable @KeyForBottom T> void runNowOrScheduleInTheFuture(
+      ScheduledFutureTask<T> task) {
+    long nanosToWait = LongMath.saturatedSubtract(task.time, clock.nanoTime());
+    if (nanosToWait <= 0) {
+      threadPoolExecutor.execute(task);
+      return;
+    }
+
+    synchronized (tasks) {
+      if (isShutdown()) {
+        threadPoolExecutor
+            .getRejectedExecutionHandler()
+            .rejectedExecution(task, threadPoolExecutor);
+      }
+      tasks.add(task);
+      tasks.notify();
+    }
+  }
+
+  /** Returns the nanoTime-based trigger time of a delayed action. */
+  private long triggerTime(long delay, TimeUnit unit) {
+    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
+  }
+
+  /** Returns the nanoTime-based trigger time of a delayed action. */
+  private long triggerTime(long delay) {
+    return LongMath.saturatedAdd(clock.nanoTime(), delay);
+  }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
similarity index 82%
rename from sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
index dd6f92cb988..331b2256f5a 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeper.java
@@ -15,10 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.gcp.util;
+package org.apache.beam.sdk.util;
 
-import com.google.api.client.util.NanoClock;
-import com.google.api.client.util.Sleeper;
+import java.util.concurrent.atomic.AtomicLong;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TestRule;
 
@@ -28,20 +27,20 @@ import org.junit.rules.TestRule;
  */
 public class FastNanoClockAndSleeper extends ExternalResource
     implements NanoClock, Sleeper, TestRule {
-  private long fastNanoTime;
+  private AtomicLong fastNanoTime = new AtomicLong();
 
   @Override
   public long nanoTime() {
-    return fastNanoTime;
+    return fastNanoTime.get();
   }
 
   @Override
   protected void before() throws Throwable {
-    fastNanoTime = SYSTEM.nanoTime();
+    fastNanoTime = new AtomicLong(NanoClock.SYSTEM.nanoTime());
   }
 
   @Override
   public void sleep(long millis) throws InterruptedException {
-    fastNanoTime += millis * 1000000L;
+    fastNanoTime.addAndGet(millis * 1000000L);
   }
 }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
similarity index 97%
rename from sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java
rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
index 88a55e7234f..03f958850e9 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/FastNanoClockAndSleeperTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FastNanoClockAndSleeperTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.gcp.util;
+package org.apache.beam.sdk.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
new file mode 100644
index 00000000000..ededf69c914
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService.ScheduledFutureTask;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.hamcrest.core.IsIterableContaining;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Tests for {@link UnboundedScheduledExecutorService}. */
+@RunWith(JUnit4.class)
+public class UnboundedScheduledExecutorServiceTest {
+
+  private static final Runnable RUNNABLE =
+      () -> {
+        // no-op
+      };
+  private static final Callable<String> CALLABLE = () -> "A";
+
+  private static final Callable<String> FAILING_CALLABLE =
+      () -> {
+        throw new Exception("Test");
+      };
+
+  @Test
+  public void testScheduleMethodErrorChecking() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    UnboundedScheduledExecutorService shutdownExecutorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    shutdownExecutorService.shutdown();
+
+    assertThrows(
+        NullPointerException.class, () -> executorService.schedule((Runnable) null, 10, SECONDS));
+    assertThrows(NullPointerException.class, () -> executorService.schedule(RUNNABLE, 10, null));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.schedule(RUNNABLE, 10, SECONDS));
+
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.schedule((Callable<String>) null, 10, SECONDS));
+    assertThrows(NullPointerException.class, () -> executorService.schedule(CALLABLE, 10, null));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.schedule(CALLABLE, 10, SECONDS));
+
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.scheduleAtFixedRate(null, 10, 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.scheduleAtFixedRate(RUNNABLE, 10, 10, null));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> executorService.scheduleAtFixedRate(RUNNABLE, 10, -10, SECONDS));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.scheduleAtFixedRate(RUNNABLE, 10, 10, SECONDS));
+
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.scheduleWithFixedDelay((Runnable) null, 10, 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, null));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> executorService.scheduleWithFixedDelay(RUNNABLE, 10, -10, SECONDS));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, SECONDS));
+
+    assertThat(executorService.shutdownNow(), empty());
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testSubmitMethodErrorChecking() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    UnboundedScheduledExecutorService shutdownExecutorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    shutdownExecutorService.shutdown();
+
+    assertThrows(NullPointerException.class, () -> executorService.submit(null, "result"));
+    assertThrows(
+        RejectedExecutionException.class, () -> shutdownExecutorService.submit(RUNNABLE, "result"));
+
+    assertThrows(NullPointerException.class, () -> executorService.submit((Runnable) null));
+    assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.submit(RUNNABLE));
+
+    assertThrows(NullPointerException.class, () -> executorService.submit((Callable<String>) null));
+    assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.submit(CALLABLE));
+
+    assertThat(executorService.shutdownNow(), empty());
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testInvokeMethodErrorChecking() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    UnboundedScheduledExecutorService shutdownExecutorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    shutdownExecutorService.shutdown();
+
+    assertThrows(NullPointerException.class, () -> executorService.invokeAll(null));
+    assertThrows(
+        NullPointerException.class, () -> executorService.invokeAll(Collections.singleton(null)));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.invokeAll(Collections.singleton(CALLABLE)));
+
+    assertThrows(NullPointerException.class, () -> executorService.invokeAll(null, 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.invokeAll(Collections.singleton(null), 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.invokeAll(Collections.singleton(CALLABLE), 10, null));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.invokeAll(Collections.singleton(CALLABLE), 10, SECONDS));
+
+    assertThrows(NullPointerException.class, () -> executorService.invokeAny(null));
+    assertThrows(
+        NullPointerException.class, () -> executorService.invokeAny(Collections.singleton(null)));
+    assertThrows(
+        IllegalArgumentException.class, () -> executorService.invokeAny(Collections.emptyList()));
+    assertThrows(
+        ExecutionException.class,
+        () -> executorService.invokeAny(Arrays.asList(FAILING_CALLABLE, FAILING_CALLABLE)));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.invokeAny(Collections.singleton(CALLABLE)));
+
+    assertThrows(NullPointerException.class, () -> executorService.invokeAny(null, 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.invokeAny(Collections.singleton(null), 10, SECONDS));
+    assertThrows(
+        NullPointerException.class,
+        () -> executorService.invokeAny(Collections.singleton(CALLABLE), 10, null));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> executorService.invokeAny(Collections.emptyList(), 10, SECONDS));
+    assertThrows(
+        ExecutionException.class,
+        () ->
+            executorService.invokeAny(
+                Arrays.asList(FAILING_CALLABLE, FAILING_CALLABLE), 10, SECONDS));
+    assertThrows(
+        RejectedExecutionException.class,
+        () -> shutdownExecutorService.invokeAny(Collections.singleton(CALLABLE), 10, SECONDS));
+
+    assertThat(executorService.shutdownNow(), empty());
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testExecuteMethodErrorChecking() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    UnboundedScheduledExecutorService shutdownExecutorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+    shutdownExecutorService.shutdown();
+
+    assertThrows(NullPointerException.class, () -> executorService.execute(null));
+    assertThrows(RejectedExecutionException.class, () -> shutdownExecutorService.execute(RUNNABLE));
+
+    assertThat(executorService.shutdownNow(), empty());
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testAllMethodsReturnScheduledFutures() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    assertThat(executorService.submit(RUNNABLE), instanceOf(ScheduledFutureTask.class));
+    assertThat(executorService.submit(CALLABLE), instanceOf(ScheduledFutureTask.class));
+    assertThat(executorService.submit(RUNNABLE, "Answer"), instanceOf(ScheduledFutureTask.class));
+
+    assertThat(
+        executorService.schedule(RUNNABLE, 10, SECONDS), instanceOf(ScheduledFutureTask.class));
+    assertThat(
+        executorService.schedule(CALLABLE, 10, SECONDS), instanceOf(ScheduledFutureTask.class));
+    assertThat(
+        executorService.scheduleAtFixedRate(RUNNABLE, 10, 10, SECONDS),
+        instanceOf(ScheduledFutureTask.class));
+    assertThat(
+        executorService.scheduleWithFixedDelay(RUNNABLE, 10, 10, SECONDS),
+        instanceOf(ScheduledFutureTask.class));
+
+    assertThat(
+        executorService.invokeAll(Arrays.asList(CALLABLE, CALLABLE)),
+        IsIterableContainingInOrder.contains(
+            instanceOf(ScheduledFutureTask.class), instanceOf(ScheduledFutureTask.class)));
+    assertThat(
+        executorService.invokeAll(Arrays.asList(CALLABLE, CALLABLE), 10, SECONDS),
+        IsIterableContainingInOrder.contains(
+            instanceOf(ScheduledFutureTask.class), instanceOf(ScheduledFutureTask.class)));
+
+    executorService.shutdownNow();
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    Runnable runnable1 = Mockito.mock(Runnable.class);
+    Runnable runnable2 = Mockito.mock(Runnable.class);
+    Runnable runnable3 = Mockito.mock(Runnable.class);
+    Callable<?> callable1 = Mockito.mock(Callable.class);
+
+    Future<?> rFuture1 = executorService.schedule(runnable1, 10, SECONDS);
+    Future<?> cFuture1 = executorService.schedule(callable1, 10, SECONDS);
+    Future<?> rFuture2 = executorService.scheduleAtFixedRate(runnable2, 10, 10, SECONDS);
+    Future<?> rFuture3 = executorService.scheduleWithFixedDelay(runnable3, 10, 10, SECONDS);
+
+    assertThat(
+        executorService.shutdownNow(),
+        IsIterableContaining.hasItems(
+            (Runnable) rFuture1, (Runnable) rFuture2, (Runnable) rFuture3, (Runnable) cFuture1));
+    verifyNoInteractions(runnable1, runnable2, runnable3, callable1);
+
+    assertTrue(executorService.isShutdown());
+    assertTrue(executorService.awaitTermination(10, SECONDS));
+    assertTrue(executorService.isTerminated());
+  }
+
+  @Test
+  public void testExecute() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    AtomicInteger callCount = new AtomicInteger();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+
+    executorService.execute(
+        () -> {
+          callCount.incrementAndGet();
+          countDownLatch.countDown();
+        });
+
+    countDownLatch.await();
+    assertEquals(1, callCount.get());
+  }
+
+  @Test
+  public void testSubmit() throws Exception {
+    List<AtomicInteger> callCounts = new ArrayList<>();
+    List<ScheduledFutureTask<?>> futures = new ArrayList<>();
+
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    callCounts.add(new AtomicInteger());
+    futures.add(
+        (ScheduledFutureTask<?>)
+            executorService.submit(
+                (Runnable) callCounts.get(callCounts.size() - 1)::incrementAndGet));
+    callCounts.add(new AtomicInteger());
+    futures.add(
+        (ScheduledFutureTask<?>)
+            executorService.submit(
+                callCounts.get(callCounts.size() - 1)::incrementAndGet, "Result"));
+    callCounts.add(new AtomicInteger());
+    futures.add(
+        (ScheduledFutureTask<?>)
+            executorService.submit(callCounts.get(callCounts.size() - 1)::incrementAndGet));
+
+    assertNull(futures.get(0).get());
+    assertEquals("Result", futures.get(1).get());
+    assertEquals(1, futures.get(2).get());
+
+    for (int i = 0; i < callCounts.size(); ++i) {
+      assertFalse(futures.get(i).isPeriodic());
+      assertEquals(1, callCounts.get(i).get());
+    }
+  }
+
+  @Test
+  public void testSchedule() throws Exception {
+    List<AtomicInteger> callCounts = new ArrayList<>();
+    List<ScheduledFutureTask<?>> futures = new ArrayList<>();
+
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    callCounts.add(new AtomicInteger());
+    futures.add(
+        (ScheduledFutureTask<?>)
+            executorService.schedule(
+                (Runnable) callCounts.get(callCounts.size() - 1)::incrementAndGet,
+                100,
+                MILLISECONDS));
+    callCounts.add(new AtomicInteger());
+    futures.add(
+        (ScheduledFutureTask<?>)
+            executorService.schedule(
+                callCounts.get(callCounts.size() - 1)::incrementAndGet, 100, MILLISECONDS));
+
+    // No tasks should have been picked up
+    wakeUpAndCheckTasks(executorService);
+    for (int i = 0; i < callCounts.size(); ++i) {
+      assertEquals(0, callCounts.get(i).get());
+    }
+
+    // No tasks should have been picked up even if the time advances 99 seconds
+    fastNanoClockAndSleeper.sleep(99);
+    wakeUpAndCheckTasks(executorService);
+    for (int i = 0; i < callCounts.size(); ++i) {
+      assertEquals(0, callCounts.get(i).get());
+    }
+
+    // All tasks should wake up and pick-up tasks
+    fastNanoClockAndSleeper.sleep(1);
+    wakeUpAndCheckTasks(executorService);
+
+    assertNull(futures.get(0).get());
+    assertEquals(1, futures.get(1).get());
+
+    for (int i = 0; i < callCounts.size(); ++i) {
+      assertFalse(futures.get(i).isPeriodic());
+      assertEquals(1, callCounts.get(i).get());
+    }
+
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testSchedulePeriodicWithFixedDelay() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    AtomicInteger callCount = new AtomicInteger();
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ScheduledFutureTask<?> future =
+        (ScheduledFutureTask<?>)
+            executorService.scheduleWithFixedDelay(
+                () -> {
+                  callCount.incrementAndGet();
+                  latch.countDown();
+                },
+                100,
+                50,
+                MILLISECONDS);
+
+    // No tasks should have been picked up
+    wakeUpAndCheckTasks(executorService);
+    assertEquals(0, callCount.get());
+
+    // No tasks should have been picked up even if the time advances 99 seconds
+    fastNanoClockAndSleeper.sleep(99);
+    wakeUpAndCheckTasks(executorService);
+    assertEquals(0, callCount.get());
+
+    // We should have picked up the task 1 time, next task should be scheduled in 50 even though we
+    // advanced to 109
+    fastNanoClockAndSleeper.sleep(10);
+    wakeUpAndCheckTasks(executorService);
+    latch.await();
+    assertEquals(1, callCount.get());
+
+    for (; ; ) {
+      synchronized (executorService.tasks) {
+        ScheduledFutureTask<?> task = executorService.tasks.peek();
+        if (task != null) {
+          assertEquals(50, task.getDelay(MILLISECONDS));
+          break;
+        }
+      }
+      Thread.sleep(1);
+    }
+
+    assertTrue(future.isPeriodic());
+    assertFalse(future.isDone());
+
+    future.cancel(true);
+    assertTrue(future.isCancelled());
+    assertTrue(future.isDone());
+
+    // Cancelled tasks should not be returned during shutdown
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  @Test
+  public void testSchedulePeriodicWithFixedRate() throws Exception {
+    FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
+    UnboundedScheduledExecutorService executorService =
+        new UnboundedScheduledExecutorService(fastNanoClockAndSleeper);
+
+    AtomicInteger callCount = new AtomicInteger();
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ScheduledFutureTask<?> future =
+        (ScheduledFutureTask<?>)
+            executorService.scheduleAtFixedRate(
+                () -> {
+                  callCount.incrementAndGet();
+                  latch.countDown();
+                },
+                100,
+                50,
+                MILLISECONDS);
+
+    // No tasks should have been picked up
+    wakeUpAndCheckTasks(executorService);
+    assertEquals(0, callCount.get());
+
+    // No tasks should have been picked up even if the time advances 99 seconds
+    fastNanoClockAndSleeper.sleep(99);
+    wakeUpAndCheckTasks(executorService);
+    assertEquals(0, callCount.get());
+
+    // We should have picked up the task 1 time, next task should be scheduled in 41 since we
+    // advanced to 109
+    fastNanoClockAndSleeper.sleep(10);
+    wakeUpAndCheckTasks(executorService);
+    latch.await();
+    assertEquals(1, callCount.get());
+
+    for (; ; ) {
+      synchronized (executorService.tasks) {
+        ScheduledFutureTask<?> task = executorService.tasks.peek();
+        if (task != null) {
+          assertEquals(41, task.getDelay(MILLISECONDS));
+          break;
+        }
+      }
+      Thread.sleep(1);
+    }
+
+    assertTrue(future.isPeriodic());
+    assertFalse(future.isDone());
+
+    future.cancel(true);
+    assertTrue(future.isCancelled());
+    assertTrue(future.isDone());
+
+    // Cancelled tasks should not be returned during shutdown
+    assertThat(executorService.shutdownNow(), empty());
+  }
+
+  void wakeUpAndCheckTasks(UnboundedScheduledExecutorService executorService) throws Exception {
+    synchronized (executorService.tasks) {
+      executorService.tasks.notify();
+    }
+    Thread.sleep(100);
+  }
+}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index d183d1647e1..0b14b244da5 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -20,9 +20,6 @@ package org.apache.beam.sdk.extensions.gcp.options;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
@@ -35,8 +32,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.beam.sdk.util.UnboundedScheduledExecutorService;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Options used to configure Google Cloud Storage. */
@@ -134,12 +130,8 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline
    * ExecutorService} is compatible with AppEngine.
    */
   class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
-    @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only.
     @Override
     public ExecutorService create(PipelineOptions options) {
-      ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
-      threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
-      threadFactoryBuilder.setDaemon(true);
       /* The SDK requires an unbounded thread pool because a step may create X writers
        * each requiring their own thread to perform the writes otherwise a writer may
        * block causing deadlock for the step because the writers buffer is full.
@@ -147,13 +139,7 @@ public interface GcsOptions extends ApplicationNameOptions, GcpOptions, Pipeline
        * them in forward order thus requiring enough threads so that each step's writers
        * can be active.
        */
-      return new ThreadPoolExecutor(
-          0,
-          Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-          Long.MAX_VALUE,
-          TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-          new SynchronousQueue<>(),
-          threadFactoryBuilder.build());
+      return new UnboundedScheduledExecutorService();
     }
   }
 
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index 5014bd7370c..4ce530de4c2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -58,8 +58,7 @@ public class GcpCoreApiSurfaceTest {
             classesInPackage("java"),
             classesInPackage("javax"),
             classesInPackage("org.apache.beam.sdk"),
-            classesInPackage("org.joda.time"),
-            classesInPackage("org.junit"));
+            classesInPackage("org.joda.time"));
 
     assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
   }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 33a87c6d0ee..26b270d5faf 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -102,6 +102,7 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -457,7 +458,7 @@ public class GcsUtilTest {
             .getObject(
                 GcsPath.fromComponents("testbucket", "testobject"),
                 mockBackOff,
-                new FastNanoClockAndSleeper())
+                new FastNanoClockAndSleeper()::sleep)
             .getSize()
             .longValue());
     assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
@@ -713,7 +714,7 @@ public class GcsUtilTest {
         .thenThrow(new SocketTimeoutException("SocketException"))
         .thenReturn(new Bucket());
 
-    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test
@@ -741,7 +742,7 @@ public class GcsUtilTest {
 
     thrown.expect(AccessDeniedException.class);
 
-    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test
@@ -767,7 +768,7 @@ public class GcsUtilTest {
         gcsUtil.bucketAccessible(
             GcsPath.fromComponents("testbucket", "testobject"),
             mockBackOff,
-            new FastNanoClockAndSleeper()));
+            new FastNanoClockAndSleeper()::sleep));
   }
 
   @Test
@@ -796,7 +797,7 @@ public class GcsUtilTest {
         gcsUtil.bucketAccessible(
             GcsPath.fromComponents("testbucket", "testobject"),
             mockBackOff,
-            new FastNanoClockAndSleeper()));
+            new FastNanoClockAndSleeper()::sleep));
   }
 
   @Test
@@ -823,7 +824,7 @@ public class GcsUtilTest {
         gcsUtil.bucketAccessible(
             GcsPath.fromComponents("testbucket", "testobject"),
             mockBackOff,
-            new FastNanoClockAndSleeper()));
+            new FastNanoClockAndSleeper()::sleep));
   }
 
   @Test
@@ -848,7 +849,7 @@ public class GcsUtilTest {
     gcsUtil.verifyBucketAccessible(
         GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff,
-        new FastNanoClockAndSleeper());
+        new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test(expected = AccessDeniedException.class)
@@ -876,7 +877,7 @@ public class GcsUtilTest {
     gcsUtil.verifyBucketAccessible(
         GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff,
-        new FastNanoClockAndSleeper());
+        new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test(expected = FileNotFoundException.class)
@@ -902,7 +903,7 @@ public class GcsUtilTest {
     gcsUtil.verifyBucketAccessible(
         GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff,
-        new FastNanoClockAndSleeper());
+        new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test
@@ -928,7 +929,7 @@ public class GcsUtilTest {
         gcsUtil.getBucket(
             GcsPath.fromComponents("testbucket", "testobject"),
             mockBackOff,
-            new FastNanoClockAndSleeper()));
+            new FastNanoClockAndSleeper()::sleep));
   }
 
   @Test
@@ -956,7 +957,7 @@ public class GcsUtilTest {
     gcsUtil.getBucket(
         GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff,
-        new FastNanoClockAndSleeper());
+        new FastNanoClockAndSleeper()::sleep);
   }
 
   @Test
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
index 8bcca0491ea..a604b97cd35 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializerTest.java
@@ -55,6 +55,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
@@ -314,7 +315,10 @@ public class RetryHttpRequestInitializerTest {
                 transport,
                 Transport.getJsonFactory(),
                 new RetryHttpRequestInitializer(
-                    fakeClockAndSleeper, fakeClockAndSleeper, Collections.emptyList(), null))
+                    fakeClockAndSleeper::nanoTime,
+                    fakeClockAndSleeper::sleep,
+                    Collections.emptyList(),
+                    null))
             .build();
 
     Get getRequest = storage.objects().get("gs://fake", "file");
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index a57299b0a0d..176bac79da7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -91,7 +91,6 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
-import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
@@ -102,6 +101,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.FailsafeValueInSingleWindow;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -248,7 +248,7 @@ public class BigQueryServicesImplTest {
           when(response.getContent()).thenReturn(toStream(testJob));
         });
 
-    Sleeper sleeper = new FastNanoClockAndSleeper();
+    Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
     JobServiceImpl.startJob(
         testJob,
         new ApiErrorExtractor(),
@@ -277,7 +277,7 @@ public class BigQueryServicesImplTest {
           when(response.getStatusCode()).thenReturn(409); // 409 means already exists
         });
 
-    Sleeper sleeper = new FastNanoClockAndSleeper();
+    Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
     JobServiceImpl.startJob(
         testJob,
         new ApiErrorExtractor(),
@@ -312,7 +312,7 @@ public class BigQueryServicesImplTest {
           when(response.getContent()).thenReturn(toStream(testJob));
         });
 
-    Sleeper sleeper = new FastNanoClockAndSleeper();
+    Sleeper sleeper = new FastNanoClockAndSleeper()::sleep;
     JobServiceImpl.startJob(
         testJob,
         new ApiErrorExtractor(),