You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/12/08 21:27:08 UTC

[beam] branch master updated: [BEAM-11936] Fix errorprone warnings (#15890)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06a5e67  [BEAM-11936] Fix errorprone warnings (#15890)
06a5e67 is described below

commit 06a5e67332aae53ea90dedb4ef6421c2a7d65035
Author: Benjamin Gonzalez <74...@users.noreply.github.com>
AuthorDate: Wed Dec 8 15:26:11 2021 -0600

    [BEAM-11936] Fix errorprone warnings (#15890)
    
    * [BEAM-11936] Fix errorprone warning: FloatingPointAssertionWithinEpsilon
    
    * [BEAM-11936] Fix errorprone warning: LockNotBeforeTry
    
    * [BEAM-11936] Fix errorprone warning: PreferJavaTimeOverload
    
    * [BEAM-11936] Fix errorprone warning: ModifiedButNotUsed
    
    * [BEAM-11936] Fix errorprone warning: UnusedNestedClass
    
    * [BEAM-11936] Remove suppresswarnings
    
    * [BEAM-11936] Fix suppressed warnings
    
    * [BEAM-11936] Fix errorprone warnings after merge master
    
    * [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload
    
    * [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload
    
    * [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload
    
    * [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload
    
    * [BEAM-11936] Add suppresswarning
    
    * [BEAM-11936] Remove unused inner class
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   6 -
 .../apache/beam/examples/snippets/Snippets.java    |   8 ++
 .../core/construction/PTransformMatchersTest.java  |  21 ----
 .../beam/runners/direct/NanosOffsetClock.java      |   6 +-
 .../runners/direct/TransformEvaluatorRegistry.java |  16 ---
 .../runners/direct/UnboundedReadDeduplicator.java  |   3 +-
 .../runners/direct/CloningBundleFactoryTest.java   |  67 -----------
 .../flink/FlinkStreamingPipelineTranslator.java    |   4 +-
 .../flink/FlinkStreamingTransformTranslators.java  |  17 ---
 .../wrappers/streaming/DoFnOperator.java           |   2 +-
 .../state/FlinkBroadcastStateInternals.java        | 131 ---------------------
 .../streaming/ExecutableStageDoFnOperatorTest.java |   1 +
 .../beam/runners/dataflow/DataflowRunner.java      | 110 -----------------
 .../runners/dataflow/DataflowPipelineJobTest.java  |  25 ----
 .../beam/runners/dataflow/worker/ReaderCache.java  |   3 +-
 .../beam/runners/dataflow/worker/StateFetcher.java |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   2 +-
 .../fn/data/RemoteGrpcPortWriteOperation.java      |   6 +-
 .../common/worker/CachingShuffleBatchReader.java   |   4 +-
 .../control/DefaultJobBundleFactory.java           |  16 +--
 .../fnexecution/control/RemoteExecutionTest.java   |   2 -
 .../beam/runners/spark/io/MicrobatchSource.java    |   2 +-
 .../translation/utils/SideInputStorage.java        |   4 +-
 .../runners/spark/util/GlobalWatermarkHolder.java  |   4 +-
 .../beam/runners/spark/util/SideInputStorage.java  |   4 +-
 .../src/main/java/org/apache/beam/sdk/io/Read.java |   3 +-
 .../org/apache/beam/sdk/schemas/SchemaCoder.java   |  20 ----
 .../apache/beam/sdk/values/PCollectionViews.java   |  59 ----------
 .../apache/beam/sdk/coders/CoderRegistryTest.java  |   4 -
 .../apache/beam/sdk/testing/ExpectedLogsTest.java  |  10 +-
 .../beam/sdk/testing/SystemNanoTimeSleeper.java    |   4 +-
 .../sdk/transforms/reflect/DoFnSignaturesTest.java |   9 --
 .../GrowableOffsetRangeTrackerTest.java            |   2 +-
 .../core/translate/TimestampExtractTransform.java  |   8 --
 .../sql/meta/provider/kafka/BeamKafkaTable.java    |   3 +-
 .../org/apache/beam/sdk/fn/CancellableQueue.java   |   4 +-
 .../java/org/apache/beam/fn/harness/FnHarness.java |   4 +-
 .../harness/state/StateFetchingIteratorsTest.java  |   2 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |   3 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |   3 +-
 .../internal/LimitingTopicBacklogReader.java       |   6 +-
 .../beam/sdk/io/gcp/spanner/SpannerAccessor.java   |  24 ----
 .../sdk/io/hadoop/format/TestRowDBWritable.java    |  10 --
 .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java    |   3 +-
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    |   4 +-
 .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java       |   5 +-
 .../org/apache/beam/sdk/io/xml/XmlSourceTest.java  |   4 +-
 47 files changed, 66 insertions(+), 596 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 001abca..7eafafc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1140,18 +1140,12 @@ class BeamModulePlugin implements Plugin<Project> {
         options.errorprone.errorproneArgs.add("-Xep:EqualsGetClass:OFF")
         options.errorprone.errorproneArgs.add("-Xep:EqualsUnsafeCast:OFF")
         options.errorprone.errorproneArgs.add("-Xep:ExtendsAutoValue:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:FloatingPointAssertionWithinEpsilon:OFF")
         options.errorprone.errorproneArgs.add("-Xep:JavaTimeDefaultTimeZone:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:LockNotBeforeTry:OFF")
         options.errorprone.errorproneArgs.add("-Xep:MixedMutabilityReturnType:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:PreferJavaTimeOverload:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:ModifiedButNotUsed:OFF")
         options.errorprone.errorproneArgs.add("-Xep:ThreadPriorityCheck:OFF")
         options.errorprone.errorproneArgs.add("-Xep:TimeUnitConversionChecker:OFF")
         options.errorprone.errorproneArgs.add("-Xep:UndefinedEquals:OFF")
         options.errorprone.errorproneArgs.add("-Xep:UnnecessaryLambda:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:UnusedVariable:OFF")
-        options.errorprone.errorproneArgs.add("-Xep:UnusedNestedClass:OFF")
         options.errorprone.errorproneArgs.add("-Xep:UnsafeReflectiveConstructionCast:OFF")
       }
 
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index d2e78fe..1bde081 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -191,6 +191,7 @@ public class Snippets {
     }
 
     {
+      @SuppressWarnings("ModifiedButNotUsed")
       // [START BigQueryDataTypes]
       TableRow row = new TableRow();
       row.set("string", "abc");
@@ -1174,6 +1175,7 @@ public class Snippets {
     }
   }
 
+  @SuppressWarnings("unused")
   private static class BundleFinalization {
     private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
       // [START BundleFinalize]
@@ -1191,6 +1193,7 @@ public class Snippets {
     }
   }
 
+  @SuppressWarnings("unused")
   private static class SplittableDoFn {
 
     private static void seekToNextRecordBoundaryInFile(
@@ -1230,6 +1233,7 @@ public class Snippets {
     }
     // [END SDF_BasicExample]
 
+    @SuppressWarnings("unused")
     private static class BasicExampleWithInitialSplitting extends FileToWordsFn {
       // [START SDF_BasicExampleWithSplitting]
       void splitRestriction(
@@ -1248,6 +1252,7 @@ public class Snippets {
       // [END SDF_BasicExampleWithSplitting]
     }
 
+    @SuppressWarnings("unused")
     private static class BasicExampleWithBadTryClaimLoop extends DoFn<String, Integer> {
       // [START SDF_BadTryClaimLoop]
       @ProcessElement
@@ -1271,6 +1276,7 @@ public class Snippets {
       // [END SDF_BadTryClaimLoop]
     }
 
+    @SuppressWarnings("unused")
     private static class CustomWatermarkEstimatorExample extends DoFn<String, Integer> {
       private static Instant currentWatermark = Instant.now();
 
@@ -1336,6 +1342,7 @@ public class Snippets {
     }
     // [END SDF_CustomWatermarkEstimator]
 
+    @SuppressWarnings("unused")
     private static class UserInitiatedCheckpointExample extends DoFn<String, Integer> {
       public static class ThrottlingException extends Exception {}
 
@@ -1398,6 +1405,7 @@ public class Snippets {
       // [END SDF_Truncate]
     }
 
+    @SuppressWarnings("unused")
     private static class GetSizeExample extends DoFn<String, Integer> {
       // [START SDF_GetSize]
       @GetSize
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 35762bb..185b52d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,9 +57,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -74,7 +72,6 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -598,22 +595,4 @@ public class PTransformMatchersTest implements Serializable {
         ResourceHints.create(),
         p);
   }
-
-  private static class FakeFilenamePolicy extends FilenamePolicy {
-    @Override
-    public ResourceId windowedFilename(
-        int shardNumber,
-        int numShards,
-        BoundedWindow window,
-        PaneInfo paneInfo,
-        FileBasedSink.OutputFileHints outputFileHints) {
-      throw new UnsupportedOperationException("should not be called");
-    }
-
-    @Override
-    public @Nullable ResourceId unwindowedFilename(
-        int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
-      throw new UnsupportedOperationException("should not be called");
-    }
-  }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index f26c907..286d3d9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 import org.joda.time.Instant;
 
 /** A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. */
@@ -37,8 +37,6 @@ class NanosOffsetClock implements Clock {
   @Override
   public Instant now() {
     return new Instant(
-        baseMillis
-            + TimeUnit.MILLISECONDS.convert(
-                System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS));
+        baseMillis + Duration.ofNanos(System.nanoTime() - nanosAtBaseMillis).toMillis());
   }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 3d96fc7..6f750d4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -39,7 +39,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -140,21 +139,6 @@ class TransformEvaluatorRegistry {
     }
   }
 
-  /**
-   * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
-   * once SDF is reorganized appropriately.
-   */
-  private static class SplittableParDoProcessElementsTranslator
-      extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?, ?>> {
-
-    private SplittableParDoProcessElementsTranslator() {}
-
-    @Override
-    public String getUrn(ProcessElements<?, ?, ?, ?, ?> transform) {
-      return SPLITTABLE_PROCESS_URN;
-    }
-  }
-
   // the TransformEvaluatorFactories can construct instances of all generic types of transform,
   // so all instances of a primitive can be handled with the same evaluator factory.
   private final Map<String, TransformEvaluatorFactory> factories;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
index b222698..125c026 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveUnboundedRead;
 import org.apache.beam.runners.local.StructuralKey;
@@ -74,7 +73,7 @@ interface UnboundedReadDeduplicator {
     private CachedIdDeduplicator() {
       ids =
           CacheBuilder.newBuilder()
-              .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
+              .expireAfterAccess(java.time.Duration.ofMillis(MAX_RETENTION_SINCE_ACCESS))
               .maximumSize(100_000L)
               .build(new TrueBooleanLoader());
     }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 6a6d7df..6d7a975 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -36,9 +36,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -191,69 +189,4 @@ public class CloningBundleFactoryTest {
       throw new CoderException("Decode not allowed");
     }
   }
-
-  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
-    @Override
-    public void encode(Record value, OutputStream outStream) throws CoderException, IOException {}
-
-    @Override
-    public Record decode(InputStream inStream) throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-  private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends AtomicCoder<Record> {
-    @Override
-    public void encode(Record value, OutputStream outStream) throws CoderException, IOException {}
-
-    @Override
-    public Record decode(InputStream inStream) throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return false;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-  private static class IdentityDoFn extends DoFn<Record, Record> {
-    @ProcessElement
-    public void proc(ProcessContext ctxt) {
-      ctxt.output(ctxt.element());
-    }
-  }
-
-  private static class SimpleIdentity extends SimpleFunction<Record, Record> {
-    @Override
-    public Record apply(Record input) {
-      return input;
-    }
-  }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index b7f99b8..bcb3883 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -21,13 +21,13 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -368,7 +368,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
         cache =
             CacheBuilder.newBuilder()
                 .maximumSize(CACHE_MAX_SIZE)
-                .expireAfterAccess(CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS)
+                .expireAfterAccess(Duration.ofSeconds(CACHE_EXPIRE_SECONDS))
                 .build();
       }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 9379ef5..18ece76 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -1417,23 +1417,6 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
-  /**
-   * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
-   * once SDF is reorganized appropriately.
-   */
-  private static class SplittableParDoProcessElementsTranslator
-      extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
-          SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?>> {
-
-    private SplittableParDoProcessElementsTranslator() {}
-
-    @Override
-    public String getUrn(
-        SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?> transform) {
-      return SPLITTABLE_PROCESS_URN;
-    }
-  }
-
   /** Registers classes specialized to the Flink runner. */
   @AutoService(TransformPayloadTranslatorRegistrar.class)
   public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 60abddd..4f4f6c1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1125,8 +1125,8 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
+      bufferLock.lock();
       try {
-        bufferLock.lock();
         pushedBackElementsHandler.pushBack(taggedValue);
       } catch (Exception e) {
         throw new RuntimeException("Couldn't pushback element.", e);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index e6c47ed..7b89e01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -553,137 +553,6 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals {
     }
   }
 
-  private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT>
-      extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
-    FlinkKeyedCombiningState(
-        OperatorStateBackend flinkStateBackend,
-        StateTag<CombiningState<InputT, AccumT, OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkBroadcastStateInternals<K2> flinkStateInternals,
-        PipelineOptions pipelineOptions) {
-      super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-    }
-
-    @Override
-    public CombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        AccumT current = readInternal();
-        if (current == null) {
-          current = combineFn.createAccumulator();
-        }
-        current = combineFn.addInput(current, value);
-        writeInternal(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        AccumT current = readInternal();
-        if (current == null) {
-          writeInternal(accum);
-        } else {
-          current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
-          writeInternal(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        AccumT accum = readInternal();
-        return accum != null ? accum : combineFn.createAccumulator();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        AccumT accum = readInternal();
-        if (accum != null) {
-          return combineFn.extractOutput(accum);
-        } else {
-          return combineFn.extractOutput(combineFn.createAccumulator());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return readInternal() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      clearInternal();
-    }
-
-    @Override
-    public boolean equals(@Nullable Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedCombiningState<?, ?, ?, ?> that = (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
   private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
       extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 4d27af6..af609e9 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -637,6 +637,7 @@ public class ExecutableStageDoFnOperatorTest {
     assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
   }
 
+  @SuppressWarnings("LockNotBeforeTry")
   @Test
   public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
     InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0f23d61..212e03a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -49,7 +48,6 @@ import java.io.PrintWriter;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -63,7 +61,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.BeamUrns;
-import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.Environments;
@@ -71,7 +68,6 @@ import org.apache.beam.runners.core.construction.External;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
-import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -130,7 +126,6 @@ import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
 import org.apache.beam.sdk.transforms.Impulse;
@@ -145,7 +140,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.NameUtils;
@@ -1783,110 +1777,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   // ================================================================================
 
-  /**
-   * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines into
-   * a Dataflow specific variant.
-   */
-  private static class StreamingFnApiCreateOverrideFactory<T>
-      implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
-
-    @Override
-    public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
-        AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) {
-      Create.Values<T> original = transform.getTransform();
-      PCollection<T> output =
-          (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
-      return PTransformReplacement.of(
-          transform.getPipeline().begin(), new StreamingFnApiCreate<>(original, output));
-    }
-
-    @Override
-    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<T> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
-  }
-
-  /**
-   * Specialized implementation for {@link org.apache.beam.sdk.transforms.Create.Values
-   * Create.Values} for the Dataflow runner in streaming mode over the Fn API.
-   */
-  private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> {
-
-    private final Create.Values<T> transform;
-    private final transient PCollection<T> originalOutput;
-
-    private StreamingFnApiCreate(Create.Values<T> transform, PCollection<T> originalOutput) {
-      this.transform = transform;
-      this.originalOutput = originalOutput;
-    }
-
-    @Override
-    public final PCollection<T> expand(PBegin input) {
-      try {
-        PCollection<T> pc =
-            Pipeline.applyTransform(input, Impulse.create())
-                .apply(
-                    ParDo.of(
-                        DecodeAndEmitDoFn.fromIterable(
-                            transform.getElements(), originalOutput.getCoder())));
-        pc.setCoder(originalOutput.getCoder());
-        return pc;
-      } catch (IOException e) {
-        throw new IllegalStateException("Unable to encode elements.", e);
-      }
-    }
-
-    /**
-     * A DoFn which stores encoded versions of elements and a representation of a Coder capable of
-     * decoding those elements.
-     *
-     * <p>TODO: BEAM-2422 - Make this a SplittableDoFn.
-     */
-    private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> {
-
-      public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
-          throws IOException {
-        ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
-        for (T element : elements) {
-          byte[] bytes = encodeToByteArray(elemCoder, element);
-          allElementsBytes.add(bytes);
-        }
-        return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
-      }
-
-      private final Collection<byte[]> elements;
-      private final RunnerApi.MessageWithComponents coderSpec;
-
-      // lazily initialized by parsing coderSpec
-      private transient Coder<T> coder;
-
-      private Coder<T> getCoder() throws IOException {
-        if (coder == null) {
-          coder =
-              (Coder)
-                  CoderTranslation.fromProto(
-                      coderSpec.getCoder(),
-                      RehydratedComponents.forComponents(coderSpec.getComponents()),
-                      CoderTranslation.TranslationContext.DEFAULT);
-        }
-        return coder;
-      }
-
-      private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
-        this.elements = elements;
-        this.coderSpec = CoderTranslation.toProto(coder);
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext context) throws IOException {
-        for (byte[] element : elements) {
-          context.output(CoderUtils.decodeFromByteArray(getCoder(), element));
-        }
-      }
-    }
-  }
-
   private static class SingleOutputExpandableTransformTranslator
       implements TransformTranslator<External.SingleOutputExpandableTransform> {
     @Override
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index a9e8976..d45b3c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -40,8 +40,6 @@ import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMessage;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.NavigableMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -526,28 +523,6 @@ public class DataflowPipelineJobTest {
     return message;
   }
 
-  private class FakeMonitor extends MonitoringUtil {
-    // Messages in timestamp order
-    private final NavigableMap<Long, JobMessage> timestampedMessages;
-
-    public FakeMonitor(JobMessage... messages) {
-      // The client should never be used; this Fake is intended to intercept relevant methods
-      super(mockDataflowClient);
-
-      NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap();
-      for (JobMessage message : messages) {
-        timestampedMessages.put(Long.parseLong(message.getTime()), message);
-      }
-
-      this.timestampedMessages = timestampedMessages;
-    }
-
-    @Override
-    public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) {
-      return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values());
-    }
-  }
-
   private static class ZeroSleeper implements Sleeper {
     @Override
     public void sleep(long l) throws InterruptedException {}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
index 58f3abf..3329c1e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
@@ -71,7 +70,7 @@ class ReaderCache {
     this.invalidationExecutor = invalidationExecutor;
     this.cache =
         CacheBuilder.newBuilder()
-            .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
+            .expireAfterWrite(java.time.Duration.ofMillis(cacheDuration.getMillis()))
             .removalListener(
                 (RemovalNotification<WindmillComputationKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
index db5256a..aeb3f62 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.dataflow.worker;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.Closeable;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.sdk.coders.Coder;
@@ -71,7 +71,7 @@ class StateFetcher {
         server,
         CacheBuilder.newBuilder()
             .maximumWeight(100000000 /* 100 MB */)
-            .expireAfterWrite(1, TimeUnit.MINUTES)
+            .expireAfterWrite(Duration.ofMinutes(1))
             .weigher((Weigher<SideInputId, SideInputCacheEntry>) (id, entry) -> entry.size())
             .build());
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 72fc234..5418e4f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -412,7 +412,7 @@ public class StreamingDataflowWorker {
   // Using Cache with time eviction policy helps us to prevent memory leak when callback ids are
   // discarded by Dataflow service and calling commitCallback is best-effort.
   private final Cache<Long, Runnable> commitCallbacks =
-      CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build();
+      CacheBuilder.newBuilder().expireAfterWrite(java.time.Duration.ofMinutes(5L)).build();
 
   // Map of user state names to system state names.
   // TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index aef324e..ad8a071 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -153,8 +153,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
   public Consumer<Integer> processedElementsConsumer() {
     usingElementsProcessed = true;
     return elementsProcessed -> {
+      lock.lock();
       try {
-        lock.lock();
         this.elementsProcessed.set(elementsProcessed);
         condition.signal();
       } finally {
@@ -168,8 +168,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
 
   private void maybeWait() throws Exception {
     if (shouldWait()) {
+      lock.lock();
       try {
-        lock.lock();
         while (shouldWait()) {
           LOG.debug(
               "Throttling elements at {} until more than {} elements been processed.",
@@ -185,8 +185,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
 
   public void abortWait() {
     usingElementsProcessed = false;
+    lock.lock();
     try {
-      lock.lock();
       condition.signal();
     } finally {
       lock.unlock();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
index 4dd13c6..33dfbc2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.dataflow.worker.util.common.worker;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
@@ -49,7 +49,7 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
     this.cache =
         CacheBuilder.newBuilder()
             .maximumSize(maximumBatches)
-            .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
+            .expireAfterAccess(Duration.ofMillis(expireAfterAccessMillis))
             .<BatchRange, Batch>build(
                 new CacheLoader<BatchRange, Batch>() {
                   @Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index b9851ca..6a3f9a7 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.control;
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Set;
@@ -26,7 +27,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -204,11 +204,11 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
                   notification -> {
                     WrappedSdkHarnessClient client = notification.getValue();
                     final int refCount;
+                    // We need to use a lock here to ensure we are not causing the environment to
+                    // be removed if beforehand a StageBundleFactory has retrieved it but not yet
+                    // issued ref() on it.
+                    refLock.lock();
                     try {
-                      // We need to use a lock here to ensure we are not causing the environment to
-                      // be removed if beforehand a StageBundleFactory has retrieved it but not yet
-                      // issued ref() on it.
-                      refLock.lock();
                       refCount = client.unref();
                     } finally {
                       refLock.unlock();
@@ -223,7 +223,7 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
                   });
 
       if (environmentExpirationMillis > 0) {
-        cacheBuilder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
+        cacheBuilder.expireAfterWrite(Duration.ofMillis(environmentExpirationMillis));
       }
 
       LoadingCache<Environment, WrappedSdkHarnessClient> cache =
@@ -474,8 +474,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
         currentCache = availableCaches.take();
         // Lock because the environment expiration can remove the ref for the client
         // which would close the underlying environment before we can ref it.
+        currentCache.lock.lock();
         try {
-          currentCache.lock.lock();
           client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
           client.ref();
         } finally {
@@ -494,8 +494,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
         currentCache = environmentCaches.get(environmentIndex);
         // Lock because the environment expiration can remove the ref for the client which would
         // close the underlying environment before we can ref it.
+        currentCache.lock.lock();
         try {
-          currentCache.lock.lock();
           client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
           client.ref();
         } finally {
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 7248b4e..62f78f2 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -799,11 +799,9 @@ public class RemoteExecutionTest implements Serializable {
             stateDelegator);
 
     Map<String, Coder> remoteOutputCoders = descriptor.getRemoteOutputCoders();
-    Map<String, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
     Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
     for (Entry<String, Coder> remoteOutputCoder : remoteOutputCoders.entrySet()) {
       List<WindowedValue<?>> outputContents = Collections.synchronizedList(new ArrayList<>());
-      outputValues.put(remoteOutputCoder.getKey(), outputContents);
       outputReceivers.put(
           remoteOutputCoder.getKey(),
           RemoteOutputReceiver.of(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 685c1a7..d07416e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -88,7 +88,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       LOG.info("Creating reader cache. Cache interval = {} ms.", readerCacheInterval);
       readerCache =
           CacheBuilder.newBuilder()
-              .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+              .expireAfterAccess(java.time.Duration.ofMillis(readerCacheInterval))
               .removalListener(new ReaderCacheRemovalListener())
               .build();
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
index acf4e05..dadc18e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.utils;
 
+import java.time.Duration;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
 
   /** JVM deserialized side input cache. */
   private static final Cache<Key<?>, Value<?>> materializedSideInputs =
-      CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
+      CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
 
   static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
     return materializedSideInputs;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 5468a27..f7c1eab 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -20,12 +20,12 @@ package org.apache.beam.runners.spark.util;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -137,7 +137,7 @@ public class GlobalWatermarkHolder {
       createWatermarkCache(final Long batchDuration) {
     return CacheBuilder.newBuilder()
         // expire watermarks every half batch duration to ensure they update in every batch.
-        .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
+        .expireAfterWrite(Duration.ofMillis(batchDuration / 2))
         .build(new WatermarksLoader());
   }
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
index 08fefe2..da796b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.spark.util;
 
+import java.time.Duration;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
 
   /** JVM deserialized side input cache. */
   private static final Cache<Key<?>, Value<?>> materializedSideInputs =
-      CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
+      CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
 
   static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
     return materializedSideInputs;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 52e87d5..0be7626 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -28,7 +28,6 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -471,7 +470,7 @@ public class Read {
       restrictionCoder = restrictionCoder();
       cachedReaders =
           CacheBuilder.newBuilder()
-              .expireAfterWrite(1, TimeUnit.MINUTES)
+              .expireAfterWrite(java.time.Duration.ofMinutes(1))
               .maximumSize(100)
               .removalListener(
                   (RemovalListener<Object, UnboundedReader>)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index db64718..959c863 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -219,26 +219,6 @@ public class SchemaCoder<T> extends CustomCoder<T> {
     return Objects.hash(schema, typeDescriptor, toRowFunction, fromRowFunction);
   }
 
-  private static class RowIdentity implements SerializableFunction<Row, Row> {
-    @Override
-    public Row apply(Row input) {
-      return input;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-
-    @Override
-    public boolean equals(@Nullable Object o) {
-      if (this == o) {
-        return true;
-      }
-      return o != null && getClass() == o.getClass();
-    }
-  }
-
   @Override
   public TypeDescriptor<T> getEncodedTypeDescriptor() {
     return this.typeDescriptor;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 360c1af..ca2b85e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -680,65 +680,6 @@ public class PCollectionViews {
       public ListIterator<T> listIterator() {
         return super.listIterator();
       }
-
-      /** A {@link ListIterator} over {@link MultimapView} adapter. */
-      private class ListIteratorOverMultimapView implements ListIterator<T> {
-        private int position;
-
-        @Override
-        public boolean hasNext() {
-          return position < size();
-        }
-
-        @Override
-        public T next() {
-          if (!hasNext()) {
-            throw new NoSuchElementException();
-          }
-          T rval = get(position);
-          position += 1;
-          return rval;
-        }
-
-        @Override
-        public boolean hasPrevious() {
-          return position > 0;
-        }
-
-        @Override
-        public T previous() {
-          if (!hasPrevious()) {
-            throw new NoSuchElementException();
-          }
-          position -= 1;
-          return get(position);
-        }
-
-        @Override
-        public int nextIndex() {
-          return position;
-        }
-
-        @Override
-        public int previousIndex() {
-          return position - 1;
-        }
-
-        @Override
-        public void remove() {
-          throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void set(T e) {
-          throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void add(T e) {
-          throw new UnsupportedOperationException();
-        }
-      }
     }
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 954eb7a..cd50db4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -62,10 +62,6 @@ public class CoderRegistryTest {
 
   @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(CoderRegistry.class);
 
-  private static class SerializableClass implements Serializable {}
-
-  private static class NotSerializableClass {}
-
   @Test
   public void testRegisterInstantiatedCoder() throws Exception {
     CoderRegistry registry = CoderRegistry.createDefault();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
index 896ca69..14dffa2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
@@ -21,13 +21,13 @@ import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.LogRecord;
 import org.hamcrest.TypeSafeMatcher;
 import org.junit.Rule;
@@ -143,8 +143,7 @@ public class ExpectedLogsTest {
   public void testThreadSafetyOfLogSaver() throws Throwable {
     CompletionService<Void> completionService =
         new ExecutorCompletionService<>(Executors.newCachedThreadPool());
-    final long scheduledLogTime =
-        TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS) + 500L;
+    final long scheduledLogTime = Duration.ofNanos(System.nanoTime()).toMillis() + 500L;
 
     List<String> expectedStrings = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
@@ -154,10 +153,7 @@ public class ExpectedLogsTest {
           () -> {
             // Have all threads started and waiting to log at about the same moment.
             sleepMillis(
-                Math.max(
-                    1,
-                    scheduledLogTime
-                        - TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)));
+                Math.max(1, scheduledLogTime - Duration.ofNanos(System.nanoTime()).toMillis()));
             LOG.trace(expected);
             return null;
           });
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
index 47a28ef..fd716b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.testing;
 
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.beam.sdk.util.Sleeper;
 
@@ -41,7 +41,7 @@ public class SystemNanoTimeSleeper implements Sleeper {
   @Override
   public void sleep(long millis) throws InterruptedException {
     long currentTime;
-    long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
+    long endTime = System.nanoTime() + Duration.ofMillis(millis).toNanos();
     while ((currentTime = System.nanoTime()) < endTime) {
       if (Thread.interrupted()) {
         throw new InterruptedException();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 90e0106..9166a33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -1602,15 +1602,6 @@ public class DoFnSignaturesTest {
         ProcessContext context, @StateId(STATE_ID) ValueState<String> state);
   }
 
-  private abstract static class DoFnDeclaringMyTimerId extends DoFn<KV<String, Integer>, Long> {
-
-    @TimerId("my-timer-id")
-    private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-    @ProcessElement
-    public void foo(ProcessContext context) {}
-  }
-
   private abstract static class DoFnDeclaringTimerAndCallback
       extends DoFn<KV<String, Integer>, Long> {
     public static final String TIMER_ID = "my-timer-id";
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
index 084cb5c..eb7d8ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
@@ -183,7 +183,7 @@ public class GrowableOffsetRangeTrackerTest {
     tracker.checkDone();
     simpleEstimator.setEstimateRangeEnd(0L);
     Progress currentProgress = tracker.getProgress();
-    assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0.001);
+    assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0);
     assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
   }
 
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
index 3ed5083..999ece6 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
@@ -86,14 +86,6 @@ public class TimestampExtractTransform<InputT, OutputT>
     }
   }
 
-  private static class Unwrap<T> extends DoFn<KV<Long, T>, T> {
-
-    @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().getValue());
-    }
-  }
-
   private final PCollectionTransform<InputT, OutputT> timestampedTransform;
 
   private TimestampExtractTransform(
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 7d9d61b..a333ba9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
 
 import static org.apache.beam.vendor.calcite.v1_28_0.com.google.common.base.Preconditions.checkArgument;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -222,7 +223,7 @@ public abstract class BeamKafkaTable extends SchemaBaseBeamTable {
       throw new NoEstimationException("There is no partition with messages in it.");
     }
 
-    ConsumerRecords<T, T> records = consumer.poll(1000);
+    ConsumerRecords<T, T> records = consumer.poll(Duration.ofSeconds(1));
 
     // Kafka guarantees the delivery of messages in order they arrive to each partition.
     // Therefore the first message seen from each partition is the first message arrived to that.
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
index e58f09b..3d1cded 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
@@ -112,8 +112,8 @@ public class CancellableQueue<T extends @NonNull Object> {
    * queue clears the exception.
    */
   public void cancel(Exception exception) {
+    lock.lock();
     try {
-      lock.lock();
       cancellationException = exception;
       notEmpty.signalAll();
       notFull.signalAll();
@@ -124,8 +124,8 @@ public class CancellableQueue<T extends @NonNull Object> {
 
   /** Enables the queue to be re-used after it has been cancelled. */
   public void reset() {
+    lock.lock();
     try {
-      lock.lock();
       cancellationException = null;
       addIndex = 0;
       takeIndex = 0;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 62a6180..a9636ff 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.fn.harness;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -253,7 +253,7 @@ public class FnHarness {
       LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors =
           CacheBuilder.newBuilder()
               .maximumSize(1000)
-              .expireAfterAccess(10, TimeUnit.MINUTES)
+              .expireAfterAccess(Duration.ofMinutes(10))
               .build(
                   new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>() {
                     @Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
index ae897cb..c9ab166 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -272,8 +272,8 @@ public class StateFetchingIteratorsTest {
       }
       assertFalse(valuesIter2.hasNext());
       assertTrue(valuesIter2.isReady());
-
       // The contents agree.
+      assertArrayEquals(expected, Iterables.toArray(results, Object.class));
       assertArrayEquals(expected, Iterables.toArray(values, Object.class));
     }
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 21c7a46..c214fda 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -83,7 +82,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
 
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
-          .expireAfterAccess(5, TimeUnit.MINUTES)
+          .expireAfterAccess(java.time.Duration.ofMinutes(5))
           .removalListener(
               (RemovalNotification<String, StreamAppendClient> removal) -> {
                 @Nullable final StreamAppendClient streamAppendClient = removal.getValue();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 3d6e33c..ede5129 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -102,7 +101,7 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
 
   private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
-          .expireAfterAccess(5, TimeUnit.MINUTES)
+          .expireAfterAccess(java.time.Duration.ofMinutes(5))
           .removalListener(
               (RemovalNotification<String, StreamAppendClient> removal) -> {
                 @Nullable final StreamAppendClient streamAppendClient = removal.getValue();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
index 8108d09..cd2adcc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
@@ -24,7 +24,7 @@ import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 import javax.annotation.Nullable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -46,8 +46,8 @@ final class LimitingTopicBacklogReader implements TopicBacklogReader {
         CacheBuilder.newBuilder()
             .ticker(ticker)
             .maximumSize(1)
-            .expireAfterWrite(1, TimeUnit.MINUTES)
-            .refreshAfterWrite(10, TimeUnit.SECONDS)
+            .expireAfterWrite(Duration.ofMinutes(1))
+            .refreshAfterWrite(Duration.ofSeconds(10))
             .build(
                 new CacheLoader<String, ComputeMessageStatsResponse>() {
                   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index faff06e..e34863e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -33,13 +33,7 @@ import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.CommitResponse;
 import com.google.spanner.v1.ExecuteSqlRequest;
 import com.google.spanner.v1.PartialResultSet;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.MethodDescriptor;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.ReleaseInfo;
@@ -206,22 +200,4 @@ public class SpannerAccessor implements AutoCloseable {
       }
     }
   }
-
-  private static class CommitDeadlineSettingInterceptor implements ClientInterceptor {
-    private final long commitDeadlineMilliseconds;
-
-    private CommitDeadlineSettingInterceptor(Duration commitDeadline) {
-      this.commitDeadlineMilliseconds = commitDeadline.getMillis();
-    }
-
-    @Override
-    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
-        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
-      if (method.getFullMethodName().equals("google.spanner.v1.Spanner/Commit")) {
-        callOptions =
-            callOptions.withDeadlineAfter(commitDeadlineMilliseconds, TimeUnit.MILLISECONDS);
-      }
-      return next.newCall(method, callOptions);
-    }
-  }
 }
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
index f4e3677..2d10bdb 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
@@ -26,7 +26,6 @@ import java.sql.SQLException;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.common.TestRow;
-import org.apache.beam.sdk.io.jdbc.JdbcIO;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 
@@ -80,13 +79,4 @@ class TestRowDBWritable extends TestRow implements DBWritable, Writable {
     id = in.readInt();
     name = in.readUTF();
   }
-
-  private static class PrepareStatementFromTestRow
-      implements JdbcIO.PreparedStatementSetter<TestRow> {
-    @Override
-    public void setParameters(TestRow element, PreparedStatement statement) throws SQLException {
-      statement.setLong(1, element.id());
-      statement.setString(2, element.name());
-    }
-  }
 }
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 75c8270..8ad0aff 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -24,6 +24,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -641,7 +642,7 @@ class KafkaExactlyOnceSink<K, V>
       ShardWriterCache() {
         this.cache =
             CacheBuilder.newBuilder()
-                .expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .expireAfterWrite(Duration.ofMillis(IDLE_TIMEOUT_MS))
                 .<Integer, ShardWriter<K, V>>removalListener(
                     notification -> {
                       if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index aaeb1b4..de70ac0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -329,7 +329,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
    * good to experiment. Often multiple marks would be finalized in a batch, it it reduce
    * finalization overhead to wait a short while and finalize only the last checkpoint mark.
    */
-  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+  private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);
 
   private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
   private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
@@ -520,7 +520,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
       while (!closed.get()) {
         try {
           if (records.isEmpty()) {
-            records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+            records = consumer.poll(KAFKA_POLL_TIMEOUT);
           } else if (availableRecordsQueue.offer(
               records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
             records = ConsumerRecords.empty();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index b7f8e7a..7ce49a9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -58,7 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,7 +176,8 @@ class ReadFromKafkaDoFn<K, V>
 
   private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
 
-  private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+  private static final org.joda.time.Duration KAFKA_POLL_TIMEOUT =
+      org.joda.time.Duration.millis(1000);
 
   @VisibleForTesting final DeserializerProvider keyDeserializerProvider;
   @VisibleForTesting final DeserializerProvider valueDeserializerProvider;
@@ -290,6 +290,7 @@ class ReadFromKafkaDoFn<K, V>
     return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
   }
 
+  @SuppressWarnings("PreferJavaTimeOverload")
   @ProcessElement
   public ProcessContinuation processElement(
       @Element KafkaSourceDescriptor kafkaSourceDescriptor,
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index d45942e..98af830 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -540,10 +540,8 @@ public class XmlSourceTest {
     exception.expectMessage("MyCustomValidationEventHandler failure mesage");
     try (Reader<WrongTrainType> reader = source.createReader(null)) {
 
-      List<WrongTrainType> results = new ArrayList<>();
       for (boolean available = reader.start(); available; available = reader.advance()) {
-        WrongTrainType train = reader.getCurrent();
-        results.add(train);
+        reader.getCurrent();
       }
     }
   }