You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/12/08 21:27:08 UTC
[beam] branch master updated: [BEAM-11936] Fix errorprone warnings (#15890)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06a5e67 [BEAM-11936] Fix errorprone warnings (#15890)
06a5e67 is described below
commit 06a5e67332aae53ea90dedb4ef6421c2a7d65035
Author: Benjamin Gonzalez <74...@users.noreply.github.com>
AuthorDate: Wed Dec 8 15:26:11 2021 -0600
[BEAM-11936] Fix errorprone warnings (#15890)
* [BEAM-11936] Fix errorprone warning: FloatingPointAssertionWithinEpsilon
* [BEAM-11936] Fix errorprone warning: LockNotBeforeTry
* [BEAM-11936] Fix errorprone warning: PreferJavaTimeOverload
* [BEAM-11936] Fix errorprone warning: ModifiedButNotUsed
* [BEAM-11936] Fix errorprone warning: UnusedNestedClass
* [BEAM-11936] Remove suppresswarnings
* [BEAM-11936] Fix suppressed warnings
* [BEAM-11936] Fix errorprone warnings after merge master
* [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload
* [BEAM-11936] Remove suppressWarnings errorprone: PreferJavaTimeOverload
* [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload
* [BEAM-11936] Fix leftover warning errorprone: PreferJavaTimeOverload
* [BEAM-11936] Add suppresswarning
* [BEAM-11936] Remove unused inner class
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 6 -
.../apache/beam/examples/snippets/Snippets.java | 8 ++
.../core/construction/PTransformMatchersTest.java | 21 ----
.../beam/runners/direct/NanosOffsetClock.java | 6 +-
.../runners/direct/TransformEvaluatorRegistry.java | 16 ---
.../runners/direct/UnboundedReadDeduplicator.java | 3 +-
.../runners/direct/CloningBundleFactoryTest.java | 67 -----------
.../flink/FlinkStreamingPipelineTranslator.java | 4 +-
.../flink/FlinkStreamingTransformTranslators.java | 17 ---
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../state/FlinkBroadcastStateInternals.java | 131 ---------------------
.../streaming/ExecutableStageDoFnOperatorTest.java | 1 +
.../beam/runners/dataflow/DataflowRunner.java | 110 -----------------
.../runners/dataflow/DataflowPipelineJobTest.java | 25 ----
.../beam/runners/dataflow/worker/ReaderCache.java | 3 +-
.../beam/runners/dataflow/worker/StateFetcher.java | 4 +-
.../dataflow/worker/StreamingDataflowWorker.java | 2 +-
.../fn/data/RemoteGrpcPortWriteOperation.java | 6 +-
.../common/worker/CachingShuffleBatchReader.java | 4 +-
.../control/DefaultJobBundleFactory.java | 16 +--
.../fnexecution/control/RemoteExecutionTest.java | 2 -
.../beam/runners/spark/io/MicrobatchSource.java | 2 +-
.../translation/utils/SideInputStorage.java | 4 +-
.../runners/spark/util/GlobalWatermarkHolder.java | 4 +-
.../beam/runners/spark/util/SideInputStorage.java | 4 +-
.../src/main/java/org/apache/beam/sdk/io/Read.java | 3 +-
.../org/apache/beam/sdk/schemas/SchemaCoder.java | 20 ----
.../apache/beam/sdk/values/PCollectionViews.java | 59 ----------
.../apache/beam/sdk/coders/CoderRegistryTest.java | 4 -
.../apache/beam/sdk/testing/ExpectedLogsTest.java | 10 +-
.../beam/sdk/testing/SystemNanoTimeSleeper.java | 4 +-
.../sdk/transforms/reflect/DoFnSignaturesTest.java | 9 --
.../GrowableOffsetRangeTrackerTest.java | 2 +-
.../core/translate/TimestampExtractTransform.java | 8 --
.../sql/meta/provider/kafka/BeamKafkaTable.java | 3 +-
.../org/apache/beam/sdk/fn/CancellableQueue.java | 4 +-
.../java/org/apache/beam/fn/harness/FnHarness.java | 4 +-
.../harness/state/StateFetchingIteratorsTest.java | 2 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 3 +-
.../bigquery/StorageApiWritesShardedRecords.java | 3 +-
.../internal/LimitingTopicBacklogReader.java | 6 +-
.../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 24 ----
.../sdk/io/hadoop/format/TestRowDBWritable.java | 10 --
.../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 3 +-
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 4 +-
.../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 5 +-
.../org/apache/beam/sdk/io/xml/XmlSourceTest.java | 4 +-
47 files changed, 66 insertions(+), 596 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 001abca..7eafafc 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1140,18 +1140,12 @@ class BeamModulePlugin implements Plugin<Project> {
options.errorprone.errorproneArgs.add("-Xep:EqualsGetClass:OFF")
options.errorprone.errorproneArgs.add("-Xep:EqualsUnsafeCast:OFF")
options.errorprone.errorproneArgs.add("-Xep:ExtendsAutoValue:OFF")
- options.errorprone.errorproneArgs.add("-Xep:FloatingPointAssertionWithinEpsilon:OFF")
options.errorprone.errorproneArgs.add("-Xep:JavaTimeDefaultTimeZone:OFF")
- options.errorprone.errorproneArgs.add("-Xep:LockNotBeforeTry:OFF")
options.errorprone.errorproneArgs.add("-Xep:MixedMutabilityReturnType:OFF")
- options.errorprone.errorproneArgs.add("-Xep:PreferJavaTimeOverload:OFF")
- options.errorprone.errorproneArgs.add("-Xep:ModifiedButNotUsed:OFF")
options.errorprone.errorproneArgs.add("-Xep:ThreadPriorityCheck:OFF")
options.errorprone.errorproneArgs.add("-Xep:TimeUnitConversionChecker:OFF")
options.errorprone.errorproneArgs.add("-Xep:UndefinedEquals:OFF")
options.errorprone.errorproneArgs.add("-Xep:UnnecessaryLambda:OFF")
- options.errorprone.errorproneArgs.add("-Xep:UnusedVariable:OFF")
- options.errorprone.errorproneArgs.add("-Xep:UnusedNestedClass:OFF")
options.errorprone.errorproneArgs.add("-Xep:UnsafeReflectiveConstructionCast:OFF")
}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index d2e78fe..1bde081 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -191,6 +191,7 @@ public class Snippets {
}
{
+ @SuppressWarnings("ModifiedButNotUsed")
// [START BigQueryDataTypes]
TableRow row = new TableRow();
row.set("string", "abc");
@@ -1174,6 +1175,7 @@ public class Snippets {
}
}
+ @SuppressWarnings("unused")
private static class BundleFinalization {
private static class BundleFinalizationDoFn extends DoFn<String, Integer> {
// [START BundleFinalize]
@@ -1191,6 +1193,7 @@ public class Snippets {
}
}
+ @SuppressWarnings("unused")
private static class SplittableDoFn {
private static void seekToNextRecordBoundaryInFile(
@@ -1230,6 +1233,7 @@ public class Snippets {
}
// [END SDF_BasicExample]
+ @SuppressWarnings("unused")
private static class BasicExampleWithInitialSplitting extends FileToWordsFn {
// [START SDF_BasicExampleWithSplitting]
void splitRestriction(
@@ -1248,6 +1252,7 @@ public class Snippets {
// [END SDF_BasicExampleWithSplitting]
}
+ @SuppressWarnings("unused")
private static class BasicExampleWithBadTryClaimLoop extends DoFn<String, Integer> {
// [START SDF_BadTryClaimLoop]
@ProcessElement
@@ -1271,6 +1276,7 @@ public class Snippets {
// [END SDF_BadTryClaimLoop]
}
+ @SuppressWarnings("unused")
private static class CustomWatermarkEstimatorExample extends DoFn<String, Integer> {
private static Instant currentWatermark = Instant.now();
@@ -1336,6 +1342,7 @@ public class Snippets {
}
// [END SDF_CustomWatermarkEstimator]
+ @SuppressWarnings("unused")
private static class UserInitiatedCheckpointExample extends DoFn<String, Integer> {
public static class ThrottlingException extends Exception {}
@@ -1398,6 +1405,7 @@ public class Snippets {
// [END SDF_Truncate]
}
+ @SuppressWarnings("unused")
private static class GetSizeExample extends DoFn<String, Integer> {
// [START SDF_GetSize]
@GetSize
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 35762bb..185b52d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,9 +57,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -74,7 +72,6 @@ import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
@@ -598,22 +595,4 @@ public class PTransformMatchersTest implements Serializable {
ResourceHints.create(),
p);
}
-
- private static class FakeFilenamePolicy extends FilenamePolicy {
- @Override
- public ResourceId windowedFilename(
- int shardNumber,
- int numShards,
- BoundedWindow window,
- PaneInfo paneInfo,
- FileBasedSink.OutputFileHints outputFileHints) {
- throw new UnsupportedOperationException("should not be called");
- }
-
- @Override
- public @Nullable ResourceId unwindowedFilename(
- int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
- throw new UnsupportedOperationException("should not be called");
- }
- }
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
index f26c907..286d3d9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import org.joda.time.Instant;
/** A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. */
@@ -37,8 +37,6 @@ class NanosOffsetClock implements Clock {
@Override
public Instant now() {
return new Instant(
- baseMillis
- + TimeUnit.MILLISECONDS.convert(
- System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS));
+ baseMillis + Duration.ofNanos(System.nanoTime() - nanosAtBaseMillis).toMillis());
}
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 3d96fc7..6f750d4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -39,7 +39,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
@@ -140,21 +139,6 @@ class TransformEvaluatorRegistry {
}
}
- /**
- * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
- * once SDF is reorganized appropriately.
- */
- private static class SplittableParDoProcessElementsTranslator
- extends TransformPayloadTranslator.NotSerializable<ProcessElements<?, ?, ?, ?, ?>> {
-
- private SplittableParDoProcessElementsTranslator() {}
-
- @Override
- public String getUrn(ProcessElements<?, ?, ?, ?, ?> transform) {
- return SPLITTABLE_PROCESS_URN;
- }
- }
-
// the TransformEvaluatorFactories can construct instances of all generic types of transform,
// so all instances of a primitive can be handled with the same evaluator factory.
private final Map<String, TransformEvaluatorFactory> factories;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
index b222698..125c026 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.core.construction.SplittableParDo.PrimitiveUnboundedRead;
import org.apache.beam.runners.local.StructuralKey;
@@ -74,7 +73,7 @@ interface UnboundedReadDeduplicator {
private CachedIdDeduplicator() {
ids =
CacheBuilder.newBuilder()
- .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
+ .expireAfterAccess(java.time.Duration.ofMillis(MAX_RETENTION_SINCE_ACCESS))
.maximumSize(100_000L)
.build(new TrueBooleanLoader());
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 6a6d7df..6d7a975 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -36,9 +36,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
@@ -191,69 +189,4 @@ public class CloningBundleFactoryTest {
throw new CoderException("Decode not allowed");
}
}
-
- private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
- @Override
- public void encode(Record value, OutputStream outStream) throws CoderException, IOException {}
-
- @Override
- public Record decode(InputStream inStream) throws CoderException, IOException {
- return new Record() {
- @Override
- public String toString() {
- return "DecodedRecord";
- }
- };
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public Object structuralValue(Record value) {
- return value;
- }
- }
-
- private static class RecordNotConsistentWithEqualsStructuralValueCoder
- extends AtomicCoder<Record> {
- @Override
- public void encode(Record value, OutputStream outStream) throws CoderException, IOException {}
-
- @Override
- public Record decode(InputStream inStream) throws CoderException, IOException {
- return new Record() {
- @Override
- public String toString() {
- return "DecodedRecord";
- }
- };
- }
-
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
- @Override
- public Object structuralValue(Record value) {
- return value;
- }
- }
-
- private static class IdentityDoFn extends DoFn<Record, Record> {
- @ProcessElement
- public void proc(ProcessContext ctxt) {
- ctxt.output(ctxt.element());
- }
- }
-
- private static class SimpleIdentity extends SimpleFunction<Record, Record> {
- @Override
- public Record apply(Record input) {
- return input;
- }
- }
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index b7f99b8..bcb3883 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -21,13 +21,13 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -368,7 +368,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
cache =
CacheBuilder.newBuilder()
.maximumSize(CACHE_MAX_SIZE)
- .expireAfterAccess(CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS)
+ .expireAfterAccess(Duration.ofSeconds(CACHE_EXPIRE_SECONDS))
.build();
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 9379ef5..18ece76 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -1417,23 +1417,6 @@ class FlinkStreamingTransformTranslators {
}
}
- /**
- * A translator just to vend the URN. This will need to be moved to runners-core-construction-java
- * once SDF is reorganized appropriately.
- */
- private static class SplittableParDoProcessElementsTranslator
- extends PTransformTranslation.TransformPayloadTranslator.NotSerializable<
- SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?>> {
-
- private SplittableParDoProcessElementsTranslator() {}
-
- @Override
- public String getUrn(
- SplittableParDoViaKeyedWorkItems.ProcessElements<?, ?, ?, ?, ?> transform) {
- return SPLITTABLE_PROCESS_URN;
- }
- }
-
/** Registers classes specialized to the Flink runner. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class FlinkTransformsRegistrar implements TransformPayloadTranslatorRegistrar {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 60abddd..4f4f6c1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1125,8 +1125,8 @@ public class DoFnOperator<InputT, OutputT>
}
private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
+ bufferLock.lock();
try {
- bufferLock.lock();
pushedBackElementsHandler.pushBack(taggedValue);
} catch (Exception e) {
throw new RuntimeException("Couldn't pushback element.", e);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
index e6c47ed..7b89e01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
@@ -553,137 +553,6 @@ public class FlinkBroadcastStateInternals<K> implements StateInternals {
}
}
- private class FlinkKeyedCombiningState<K2, InputT, AccumT, OutputT>
- extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
- FlinkKeyedCombiningState(
- OperatorStateBackend flinkStateBackend,
- StateTag<CombiningState<InputT, AccumT, OutputT>> address,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkBroadcastStateInternals<K2> flinkStateInternals,
- PipelineOptions pipelineOptions) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder, pipelineOptions);
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- AccumT current = readInternal();
- if (current == null) {
- current = combineFn.createAccumulator();
- }
- current = combineFn.addInput(current, value);
- writeInternal(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- AccumT current = readInternal();
- if (current == null) {
- writeInternal(accum);
- } else {
- current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
- writeInternal(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- AccumT accum = readInternal();
- return accum != null ? accum : combineFn.createAccumulator();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT read() {
- try {
- AccumT accum = readInternal();
- if (accum != null) {
- return combineFn.extractOutput(accum);
- } else {
- return combineFn.extractOutput(combineFn.createAccumulator());
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return readInternal() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkKeyedCombiningState<?, ?, ?, ?> that = (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
private class FlinkCombiningStateWithContext<K2, InputT, AccumT, OutputT>
extends AbstractBroadcastState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 4d27af6..af609e9 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -637,6 +637,7 @@ public class ExecutableStageDoFnOperatorTest {
assertThat(statefulDoFnRunner, instanceOf(StatefulDoFnRunner.class));
}
+ @SuppressWarnings("LockNotBeforeTry")
@Test
public void testEnsureStateCleanupWithKeyedInputCleanupTimer() {
InMemoryTimerInternals inMemoryTimerInternals = new InMemoryTimerInternals();
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0f23d61..212e03a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -49,7 +48,6 @@ import java.io.PrintWriter;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -63,7 +61,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
-import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
import org.apache.beam.runners.core.construction.Environments;
@@ -71,7 +68,6 @@ import org.apache.beam.runners.core.construction.External;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PipelineTranslation;
-import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
@@ -130,7 +126,6 @@ import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.Impulse;
@@ -145,7 +140,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.NameUtils;
@@ -1783,110 +1777,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// ================================================================================
- /**
- * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines into
- * a Dataflow specific variant.
- */
- private static class StreamingFnApiCreateOverrideFactory<T>
- implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {
-
- @Override
- public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
- AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) {
- Create.Values<T> original = transform.getTransform();
- PCollection<T> output =
- (PCollection) Iterables.getOnlyElement(transform.getOutputs().values());
- return PTransformReplacement.of(
- transform.getPipeline().begin(), new StreamingFnApiCreate<>(original, output));
- }
-
- @Override
- public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PCollection<T> newOutput) {
- return ReplacementOutputs.singleton(outputs, newOutput);
- }
- }
-
- /**
- * Specialized implementation for {@link org.apache.beam.sdk.transforms.Create.Values
- * Create.Values} for the Dataflow runner in streaming mode over the Fn API.
- */
- private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> {
-
- private final Create.Values<T> transform;
- private final transient PCollection<T> originalOutput;
-
- private StreamingFnApiCreate(Create.Values<T> transform, PCollection<T> originalOutput) {
- this.transform = transform;
- this.originalOutput = originalOutput;
- }
-
- @Override
- public final PCollection<T> expand(PBegin input) {
- try {
- PCollection<T> pc =
- Pipeline.applyTransform(input, Impulse.create())
- .apply(
- ParDo.of(
- DecodeAndEmitDoFn.fromIterable(
- transform.getElements(), originalOutput.getCoder())));
- pc.setCoder(originalOutput.getCoder());
- return pc;
- } catch (IOException e) {
- throw new IllegalStateException("Unable to encode elements.", e);
- }
- }
-
- /**
- * A DoFn which stores encoded versions of elements and a representation of a Coder capable of
- * decoding those elements.
- *
- * <p>TODO: BEAM-2422 - Make this a SplittableDoFn.
- */
- private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> {
-
- public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
- throws IOException {
- ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
- for (T element : elements) {
- byte[] bytes = encodeToByteArray(elemCoder, element);
- allElementsBytes.add(bytes);
- }
- return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
- }
-
- private final Collection<byte[]> elements;
- private final RunnerApi.MessageWithComponents coderSpec;
-
- // lazily initialized by parsing coderSpec
- private transient Coder<T> coder;
-
- private Coder<T> getCoder() throws IOException {
- if (coder == null) {
- coder =
- (Coder)
- CoderTranslation.fromProto(
- coderSpec.getCoder(),
- RehydratedComponents.forComponents(coderSpec.getComponents()),
- CoderTranslation.TranslationContext.DEFAULT);
- }
- return coder;
- }
-
- private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
- this.elements = elements;
- this.coderSpec = CoderTranslation.toProto(coder);
- }
-
- @ProcessElement
- public void processElement(ProcessContext context) throws IOException {
- for (byte[] element : elements) {
- context.output(CoderUtils.decodeFromByteArray(getCoder(), element));
- }
- }
- }
- }
-
private static class SingleOutputExpandableTransformTranslator
implements TransformTranslator<External.SingleOutputExpandableTransform> {
@Override
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index a9e8976..d45b3c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -40,8 +40,6 @@ import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.JobMessage;
import java.io.IOException;
import java.net.SocketTimeoutException;
-import java.util.List;
-import java.util.NavigableMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -55,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
@@ -526,28 +523,6 @@ public class DataflowPipelineJobTest {
return message;
}
- private class FakeMonitor extends MonitoringUtil {
- // Messages in timestamp order
- private final NavigableMap<Long, JobMessage> timestampedMessages;
-
- public FakeMonitor(JobMessage... messages) {
- // The client should never be used; this Fake is intended to intercept relevant methods
- super(mockDataflowClient);
-
- NavigableMap<Long, JobMessage> timestampedMessages = Maps.newTreeMap();
- for (JobMessage message : messages) {
- timestampedMessages.put(Long.parseLong(message.getTime()), message);
- }
-
- this.timestampedMessages = timestampedMessages;
- }
-
- @Override
- public List<JobMessage> getJobMessages(String jobId, long startTimestampMs) {
- return ImmutableList.copyOf(timestampedMessages.headMap(startTimestampMs).values());
- }
- }
-
private static class ZeroSleeper implements Sleeper {
@Override
public void sleep(long l) throws InterruptedException {}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
index 58f3abf..3329c1e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
import java.io.IOException;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
@@ -71,7 +70,7 @@ class ReaderCache {
this.invalidationExecutor = invalidationExecutor;
this.cache =
CacheBuilder.newBuilder()
- .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
+ .expireAfterWrite(java.time.Duration.ofMillis(cacheDuration.getMillis()))
.removalListener(
(RemovalNotification<WindmillComputationKey, CacheEntry> notification) -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
index db5256a..aeb3f62 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StateFetcher.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.Closeable;
+import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
@@ -71,7 +71,7 @@ class StateFetcher {
server,
CacheBuilder.newBuilder()
.maximumWeight(100000000 /* 100 MB */)
- .expireAfterWrite(1, TimeUnit.MINUTES)
+ .expireAfterWrite(Duration.ofMinutes(1))
.weigher((Weigher<SideInputId, SideInputCacheEntry>) (id, entry) -> entry.size())
.build());
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 72fc234..5418e4f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -412,7 +412,7 @@ public class StreamingDataflowWorker {
// Using Cache with time eviction policy helps us to prevent memory leak when callback ids are
// discarded by Dataflow service and calling commitCallback is best-effort.
private final Cache<Long, Runnable> commitCallbacks =
- CacheBuilder.newBuilder().expireAfterWrite(5L, TimeUnit.MINUTES).build();
+ CacheBuilder.newBuilder().expireAfterWrite(java.time.Duration.ofMinutes(5L)).build();
// Map of user state names to system state names.
// TODO(drieber): obsolete stateNameMap. Use transformUserNameToStateFamily in
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
index aef324e..ad8a071 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java
@@ -153,8 +153,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
public Consumer<Integer> processedElementsConsumer() {
usingElementsProcessed = true;
return elementsProcessed -> {
+ lock.lock();
try {
- lock.lock();
this.elementsProcessed.set(elementsProcessed);
condition.signal();
} finally {
@@ -168,8 +168,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
private void maybeWait() throws Exception {
if (shouldWait()) {
+ lock.lock();
try {
- lock.lock();
while (shouldWait()) {
LOG.debug(
"Throttling elements at {} until more than {} elements been processed.",
@@ -185,8 +185,8 @@ public class RemoteGrpcPortWriteOperation<T> extends ReceivingOperation {
public void abortWait() {
usingElementsProcessed = false;
+ lock.lock();
try {
- lock.lock();
condition.signal();
} finally {
lock.unlock();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
index 4dd13c6..33dfbc2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;
import java.io.IOException;
+import java.time.Duration;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
@@ -49,7 +49,7 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader {
this.cache =
CacheBuilder.newBuilder()
.maximumSize(maximumBatches)
- .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS)
+ .expireAfterAccess(Duration.ofMillis(expireAfterAccessMillis))
.<BatchRange, Batch>build(
new CacheLoader<BatchRange, Batch>() {
@Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index b9851ca..6a3f9a7 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.fnexecution.control;
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.time.Duration;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
@@ -26,7 +27,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -204,11 +204,11 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
notification -> {
WrappedSdkHarnessClient client = notification.getValue();
final int refCount;
+ // We need to use a lock here to ensure we are not causing the environment to
+ // be removed if beforehand a StageBundleFactory has retrieved it but not yet
+ // issued ref() on it.
+ refLock.lock();
try {
- // We need to use a lock here to ensure we are not causing the environment to
- // be removed if beforehand a StageBundleFactory has retrieved it but not yet
- // issued ref() on it.
- refLock.lock();
refCount = client.unref();
} finally {
refLock.unlock();
@@ -223,7 +223,7 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
});
if (environmentExpirationMillis > 0) {
- cacheBuilder.expireAfterWrite(environmentExpirationMillis, TimeUnit.MILLISECONDS);
+ cacheBuilder.expireAfterWrite(Duration.ofMillis(environmentExpirationMillis));
}
LoadingCache<Environment, WrappedSdkHarnessClient> cache =
@@ -474,8 +474,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
currentCache = availableCaches.take();
// Lock because the environment expiration can remove the ref for the client
// which would close the underlying environment before we can ref it.
+ currentCache.lock.lock();
try {
- currentCache.lock.lock();
client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
client.ref();
} finally {
@@ -494,8 +494,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
currentCache = environmentCaches.get(environmentIndex);
// Lock because the environment expiration can remove the ref for the client which would
// close the underlying environment before we can ref it.
+ currentCache.lock.lock();
try {
- currentCache.lock.lock();
client = currentCache.cache.getUnchecked(executableStage.getEnvironment());
client.ref();
} finally {
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 7248b4e..62f78f2 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -799,11 +799,9 @@ public class RemoteExecutionTest implements Serializable {
stateDelegator);
Map<String, Coder> remoteOutputCoders = descriptor.getRemoteOutputCoders();
- Map<String, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
Map<String, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
for (Entry<String, Coder> remoteOutputCoder : remoteOutputCoders.entrySet()) {
List<WindowedValue<?>> outputContents = Collections.synchronizedList(new ArrayList<>());
- outputValues.put(remoteOutputCoder.getKey(), outputContents);
outputReceivers.put(
remoteOutputCoder.getKey(),
RemoteOutputReceiver.of(
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 685c1a7..d07416e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -88,7 +88,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
LOG.info("Creating reader cache. Cache interval = {} ms.", readerCacheInterval);
readerCache =
CacheBuilder.newBuilder()
- .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+ .expireAfterAccess(java.time.Duration.ofMillis(readerCacheInterval))
.removalListener(new ReaderCacheRemovalListener())
.build();
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
index acf4e05..dadc18e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/utils/SideInputStorage.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.utils;
+import java.time.Duration;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
/** JVM deserialized side input cache. */
private static final Cache<Key<?>, Value<?>> materializedSideInputs =
- CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
+ CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
return materializedSideInputs;
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 5468a27..f7c1eab 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -20,12 +20,12 @@ package org.apache.beam.runners.spark.util;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import java.io.Serializable;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
@@ -137,7 +137,7 @@ public class GlobalWatermarkHolder {
createWatermarkCache(final Long batchDuration) {
return CacheBuilder.newBuilder()
// expire watermarks every half batch duration to ensure they update in every batch.
- .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(Duration.ofMillis(batchDuration / 2))
.build(new WatermarksLoader());
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
index 08fefe2..da796b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SideInputStorage.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.spark.util;
+import java.time.Duration;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -36,7 +36,7 @@ class SideInputStorage {
/** JVM deserialized side input cache. */
private static final Cache<Key<?>, Value<?>> materializedSideInputs =
- CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
+ CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(5)).build();
static Cache<Key<?>, Value<?>> getMaterializedSideInputs() {
return materializedSideInputs;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 52e87d5..0be7626 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -28,7 +28,6 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -471,7 +470,7 @@ public class Read {
restrictionCoder = restrictionCoder();
cachedReaders =
CacheBuilder.newBuilder()
- .expireAfterWrite(1, TimeUnit.MINUTES)
+ .expireAfterWrite(java.time.Duration.ofMinutes(1))
.maximumSize(100)
.removalListener(
(RemovalListener<Object, UnboundedReader>)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index db64718..959c863 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -219,26 +219,6 @@ public class SchemaCoder<T> extends CustomCoder<T> {
return Objects.hash(schema, typeDescriptor, toRowFunction, fromRowFunction);
}
- private static class RowIdentity implements SerializableFunction<Row, Row> {
- @Override
- public Row apply(Row input) {
- return input;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(getClass());
- }
-
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- return o != null && getClass() == o.getClass();
- }
- }
-
@Override
public TypeDescriptor<T> getEncodedTypeDescriptor() {
return this.typeDescriptor;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 360c1af..ca2b85e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -680,65 +680,6 @@ public class PCollectionViews {
public ListIterator<T> listIterator() {
return super.listIterator();
}
-
- /** A {@link ListIterator} over {@link MultimapView} adapter. */
- private class ListIteratorOverMultimapView implements ListIterator<T> {
- private int position;
-
- @Override
- public boolean hasNext() {
- return position < size();
- }
-
- @Override
- public T next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- T rval = get(position);
- position += 1;
- return rval;
- }
-
- @Override
- public boolean hasPrevious() {
- return position > 0;
- }
-
- @Override
- public T previous() {
- if (!hasPrevious()) {
- throw new NoSuchElementException();
- }
- position -= 1;
- return get(position);
- }
-
- @Override
- public int nextIndex() {
- return position;
- }
-
- @Override
- public int previousIndex() {
- return position - 1;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void set(T e) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void add(T e) {
- throw new UnsupportedOperationException();
- }
- }
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 954eb7a..cd50db4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -62,10 +62,6 @@ public class CoderRegistryTest {
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(CoderRegistry.class);
- private static class SerializableClass implements Serializable {}
-
- private static class NotSerializableClass {}
-
@Test
public void testRegisterInstantiatedCoder() throws Exception {
CoderRegistry registry = CoderRegistry.createDefault();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
index 896ca69..14dffa2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java
@@ -21,13 +21,13 @@ import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.logging.LogRecord;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Rule;
@@ -143,8 +143,7 @@ public class ExpectedLogsTest {
public void testThreadSafetyOfLogSaver() throws Throwable {
CompletionService<Void> completionService =
new ExecutorCompletionService<>(Executors.newCachedThreadPool());
- final long scheduledLogTime =
- TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS) + 500L;
+ final long scheduledLogTime = Duration.ofNanos(System.nanoTime()).toMillis() + 500L;
List<String> expectedStrings = new ArrayList<>();
for (int i = 0; i < 100; i++) {
@@ -154,10 +153,7 @@ public class ExpectedLogsTest {
() -> {
// Have all threads started and waiting to log at about the same moment.
sleepMillis(
- Math.max(
- 1,
- scheduledLogTime
- - TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS)));
+ Math.max(1, scheduledLogTime - Duration.ofNanos(System.nanoTime()).toMillis()));
LOG.trace(expected);
return null;
});
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
index 47a28ef..fd716b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SystemNanoTimeSleeper.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.testing;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import java.util.concurrent.locks.LockSupport;
import org.apache.beam.sdk.util.Sleeper;
@@ -41,7 +41,7 @@ public class SystemNanoTimeSleeper implements Sleeper {
@Override
public void sleep(long millis) throws InterruptedException {
long currentTime;
- long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
+ long endTime = System.nanoTime() + Duration.ofMillis(millis).toNanos();
while ((currentTime = System.nanoTime()) < endTime) {
if (Thread.interrupted()) {
throw new InterruptedException();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 90e0106..9166a33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -1602,15 +1602,6 @@ public class DoFnSignaturesTest {
ProcessContext context, @StateId(STATE_ID) ValueState<String> state);
}
- private abstract static class DoFnDeclaringMyTimerId extends DoFn<KV<String, Integer>, Long> {
-
- @TimerId("my-timer-id")
- private final TimerSpec bizzle = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @ProcessElement
- public void foo(ProcessContext context) {}
- }
-
private abstract static class DoFnDeclaringTimerAndCallback
extends DoFn<KV<String, Integer>, Long> {
public static final String TIMER_ID = "my-timer-id";
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
index 084cb5c..eb7d8ca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTrackerTest.java
@@ -183,7 +183,7 @@ public class GrowableOffsetRangeTrackerTest {
tracker.checkDone();
simpleEstimator.setEstimateRangeEnd(0L);
Progress currentProgress = tracker.getProgress();
- assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0.001);
+ assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0);
assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
}
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
index 3ed5083..999ece6 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/TimestampExtractTransform.java
@@ -86,14 +86,6 @@ public class TimestampExtractTransform<InputT, OutputT>
}
}
- private static class Unwrap<T> extends DoFn<KV<Long, T>, T> {
-
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().getValue());
- }
- }
-
private final PCollectionTransform<InputT, OutputT> timestampedTransform;
private TimestampExtractTransform(
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 7d9d61b..a333ba9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
import static org.apache.beam.vendor.calcite.v1_28_0.com.google.common.base.Preconditions.checkArgument;
+import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -222,7 +223,7 @@ public abstract class BeamKafkaTable extends SchemaBaseBeamTable {
throw new NoEstimationException("There is no partition with messages in it.");
}
- ConsumerRecords<T, T> records = consumer.poll(1000);
+ ConsumerRecords<T, T> records = consumer.poll(Duration.ofSeconds(1));
// Kafka guarantees the delivery of messages in order they arrive to each partition.
// Therefore the first message seen from each partition is the first message arrived to that.
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
index e58f09b..3d1cded 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/CancellableQueue.java
@@ -112,8 +112,8 @@ public class CancellableQueue<T extends @NonNull Object> {
* queue clears the exception.
*/
public void cancel(Exception exception) {
+ lock.lock();
try {
- lock.lock();
cancellationException = exception;
notEmpty.signalAll();
notFull.signalAll();
@@ -124,8 +124,8 @@ public class CancellableQueue<T extends @NonNull Object> {
/** Enables the queue to be re-used after it has been cancelled. */
public void reset() {
+ lock.lock();
try {
- lock.lock();
cancellationException = null;
addIndex = 0;
takeIndex = 0;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 62a6180..a9636ff 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.fn.harness;
+import java.time.Duration;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -253,7 +253,7 @@ public class FnHarness {
LoadingCache<String, BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors =
CacheBuilder.newBuilder()
.maximumSize(1000)
- .expireAfterAccess(10, TimeUnit.MINUTES)
+ .expireAfterAccess(Duration.ofMinutes(10))
.build(
new CacheLoader<String, BeamFnApi.ProcessBundleDescriptor>() {
@Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
index ae897cb..c9ab166 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -272,8 +272,8 @@ public class StateFetchingIteratorsTest {
}
assertFalse(valuesIter2.hasNext());
assertTrue(valuesIter2.isReady());
-
// The contents agree.
+ assertArrayEquals(expected, Iterables.toArray(results, Object.class));
assertArrayEquals(expected, Iterables.toArray(values, Object.class));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 21c7a46..c214fda 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -83,7 +82,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
- .expireAfterAccess(5, TimeUnit.MINUTES)
+ .expireAfterAccess(java.time.Duration.ofMinutes(5))
.removalListener(
(RemovalNotification<String, StreamAppendClient> removal) -> {
@Nullable final StreamAppendClient streamAppendClient = removal.getValue();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 3d6e33c..ede5129 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -102,7 +101,7 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
- .expireAfterAccess(5, TimeUnit.MINUTES)
+ .expireAfterAccess(java.time.Duration.ofMinutes(5))
.removalListener(
(RemovalNotification<String, StreamAppendClient> removal) -> {
@Nullable final StreamAppendClient streamAppendClient = removal.getValue();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
index 8108d09..cd2adcc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/LimitingTopicBacklogReader.java
@@ -24,7 +24,7 @@ import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@@ -46,8 +46,8 @@ final class LimitingTopicBacklogReader implements TopicBacklogReader {
CacheBuilder.newBuilder()
.ticker(ticker)
.maximumSize(1)
- .expireAfterWrite(1, TimeUnit.MINUTES)
- .refreshAfterWrite(10, TimeUnit.SECONDS)
+ .expireAfterWrite(Duration.ofMinutes(1))
+ .refreshAfterWrite(Duration.ofSeconds(10))
.build(
new CacheLoader<String, ComputeMessageStatsResponse>() {
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index faff06e..e34863e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -33,13 +33,7 @@ import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.MethodDescriptor;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.ReleaseInfo;
@@ -206,22 +200,4 @@ public class SpannerAccessor implements AutoCloseable {
}
}
}
-
- private static class CommitDeadlineSettingInterceptor implements ClientInterceptor {
- private final long commitDeadlineMilliseconds;
-
- private CommitDeadlineSettingInterceptor(Duration commitDeadline) {
- this.commitDeadlineMilliseconds = commitDeadline.getMillis();
- }
-
- @Override
- public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
- MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
- if (method.getFullMethodName().equals("google.spanner.v1.Spanner/Commit")) {
- callOptions =
- callOptions.withDeadlineAfter(commitDeadlineMilliseconds, TimeUnit.MILLISECONDS);
- }
- return next.newCall(method, callOptions);
- }
- }
}
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
index f4e3677..2d10bdb 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestRowDBWritable.java
@@ -26,7 +26,6 @@ import java.sql.SQLException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.common.TestRow;
-import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
@@ -80,13 +79,4 @@ class TestRowDBWritable extends TestRow implements DBWritable, Writable {
id = in.readInt();
name = in.readUTF();
}
-
- private static class PrepareStatementFromTestRow
- implements JdbcIO.PreparedStatementSetter<TestRow> {
- @Override
- public void setParameters(TestRow element, PreparedStatement statement) throws SQLException {
- statement.setLong(1, element.id());
- statement.setString(2, element.name());
- }
- }
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 75c8270..8ad0aff 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -24,6 +24,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -641,7 +642,7 @@ class KafkaExactlyOnceSink<K, V>
ShardWriterCache() {
this.cache =
CacheBuilder.newBuilder()
- .expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(Duration.ofMillis(IDLE_TIMEOUT_MS))
.<Integer, ShardWriter<K, V>>removalListener(
notification -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index aaeb1b4..de70ac0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -329,7 +329,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
* good to experiment. Often multiple marks would be finalized in a batch, it it reduce
* finalization overhead to wait a short while and finalize only the last checkpoint mark.
*/
- private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+ private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);
private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
@@ -520,7 +520,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
while (!closed.get()) {
try {
if (records.isEmpty()) {
- records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+ records = consumer.poll(KAFKA_POLL_TIMEOUT);
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
records = ConsumerRecords.empty();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
index b7f8e7a..7ce49a9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
@@ -58,7 +58,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,7 +176,8 @@ class ReadFromKafkaDoFn<K, V>
private transient LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
- private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+ private static final org.joda.time.Duration KAFKA_POLL_TIMEOUT =
+ org.joda.time.Duration.millis(1000);
@VisibleForTesting final DeserializerProvider keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider valueDeserializerProvider;
@@ -290,6 +290,7 @@ class ReadFromKafkaDoFn<K, V>
return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetPoller);
}
+ @SuppressWarnings("PreferJavaTimeOverload")
@ProcessElement
public ProcessContinuation processElement(
@Element KafkaSourceDescriptor kafkaSourceDescriptor,
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index d45942e..98af830 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -540,10 +540,8 @@ public class XmlSourceTest {
exception.expectMessage("MyCustomValidationEventHandler failure mesage");
try (Reader<WrongTrainType> reader = source.createReader(null)) {
- List<WrongTrainType> results = new ArrayList<>();
for (boolean available = reader.start(); available; available = reader.advance()) {
- WrongTrainType train = reader.getCurrent();
- results.add(train);
+ reader.getCurrent();
}
}
}