You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 23:38:06 UTC
[beam] Diff for: [GitHub] kennknowles merged pull request #7505: [BEAM-5009]
Pin spotless and googleJavaFormat to latest; apply globally
diff --git a/build.gradle b/build.gradle
index f8d8ef2e9605..bb9de48e4f32 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,7 +54,7 @@ buildscript {
classpath "io.spring.gradle:propdeps-plugin:0.0.9.RELEASE" // Enable provided and optional configurations
classpath "gradle.plugin.org.nosphere.apache:creadur-rat-gradle:0.3.1" // Enable Apache license enforcement
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0" // Enable Avro code generation
- classpath "com.diffplug.spotless:spotless-plugin-gradle:3.7.0" // Enable a code formatting plugin
+ classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0" // Enable a code formatting plugin
classpath "gradle.plugin.com.github.blindpirate:gogradle:0.10" // Enable Go code compilation
classpath "gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.20.1" // Enable building Docker containers
classpath "cz.malohlava:visteg:1.0.3" // Enable generating Gradle task dependencies as ".dot" files
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 126b4c97dd2f..1d7ba4baba9a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -727,7 +727,7 @@ class BeamModulePlugin implements Plugin<Project> {
enforceCheck !disableSpotlessCheck
java {
licenseHeader javaLicenseHeader
- googleJavaFormat()
+ googleJavaFormat('1.7')
// Details see: https://github.com/diffplug/spotless/blob/master/PADDEDCELL.md
paddedCell()
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
index 1cb8d34275c4..942e8f35a0a5 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
@@ -62,13 +62,12 @@
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
- reason =
- "Might be useful to override the default "
- + "implementation because of performance reasons"
- + "(e.g. using bloom filters), which might reduce the space complexity",
- state = StateComplexity.CONSTANT,
- repartitions = 1
-)
+ reason =
+ "Might be useful to override the default "
+ + "implementation because of performance reasons"
+ + "(e.g. using bloom filters), which might reduce the space complexity",
+ state = StateComplexity.CONSTANT,
+ repartitions = 1)
public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT, OutputT>
implements CompositeOperator<InputT, OutputT> {
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
index ddd9eb023a98..e90d2441d76d 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
@@ -71,13 +71,12 @@
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
- reason =
- "Might be useful to override because of performance reasons in a "
- + "specific join types (e.g. sort join), which might reduce the space "
- + "complexity",
- state = StateComplexity.LINEAR,
- repartitions = 1
-)
+ reason =
+ "Might be useful to override because of performance reasons in a "
+ + "specific join types (e.g. sort join), which might reduce the space "
+ + "complexity",
+ state = StateComplexity.LINEAR,
+ repartitions = 1)
public class Join<LeftT, RightT, KeyT, OutputT>
extends ShuffleOperator<Object, KeyT, KV<KeyT, OutputT>> {
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
index af2a518cbf76..6b1eb42789f2 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
@@ -87,13 +87,12 @@
*/
@Audience(Audience.Type.CLIENT)
@Recommended(
- reason =
- "Is very recommended to override because of performance in "
- + "a specific area of (mostly) batch calculations where combiners "
- + "can be efficiently used in the executor-specific implementation",
- state = StateComplexity.CONSTANT_IF_COMBINABLE,
- repartitions = 1
-)
+ reason =
+ "Is very recommended to override because of performance in "
+ + "a specific area of (mostly) batch calculations where combiners "
+ + "can be efficiently used in the executor-specific implementation",
+ state = StateComplexity.CONSTANT_IF_COMBINABLE,
+ repartitions = 1)
public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
extends ShuffleOperator<InputT, KeyT, KV<KeyT, OutputT>> implements TypeAware.Value<ValueT> {
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
index cf8f89611c92..4b2eb8c774ea 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
@@ -207,15 +207,15 @@ public void codersAndTypesSection() {
final PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
- //Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
+ // Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
KryoCoderProvider.of().registerTo(pipeline);
- //Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
+ // Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
options.as(KryoOptions.class).setKryoRegistrationRequired(true);
KryoCoderProvider.of(
- kryo -> { //KryoRegistrar of your uwn
- kryo.register(KryoSerializedElementType.class); //other may follow
+ kryo -> { // KryoRegistrar of your uwn
+ kryo.register(KryoSerializedElementType.class); // other may follow
})
.registerTo(pipeline);
@@ -510,14 +510,15 @@ public void flatMapWithTimeExtractorOperator() {
new SomeEventObject(3),
new SomeEventObject(4)));
- // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
+ // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
+ // returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
FlatMap.named("extract-event-time")
.of(events)
.using((SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
.eventTimeBy(SomeEventObject::getEventTimeInMillis)
.output();
- //Euphoria will now know event time for each event
+ // Euphoria will now know event time for each event
pipeline.run();
}
@@ -530,7 +531,7 @@ public void filterOperator() {
// suppose nums contains: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
PCollection<Integer> divisibleBythree =
Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output();
- //divisibleBythree will contain: [ 0, 3, 6, 9]
+ // divisibleBythree will contain: [ 0, 3, 6, 9]
PAssert.that(divisibleBythree).containsInAnyOrder(0, 3, 6, 9);
pipeline.run();
@@ -542,7 +543,7 @@ public void reduceByKeyTestOperator1() {
PCollection<String> animals =
pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
- //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+ // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
@@ -551,7 +552,8 @@ public void reduceByKeyTestOperator1() {
.valueBy(e -> 1)
.reduceBy(Stream::count)
.output();
- // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+ // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+ // 1L), KV.of(8, 1L) ]
PAssert.that(countOfAnimalNamesByLength)
.containsInAnyOrder(
@@ -566,7 +568,7 @@ public void reduceByKeyTestOperatorCombinable() {
PCollection<String> animals =
pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
- //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+ // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
@@ -575,7 +577,8 @@ public void reduceByKeyTestOperatorCombinable() {
.valueBy(e -> 1L)
.combineBy(s -> s.mapToLong(l -> l).sum())
.output();
- // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+ // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+ // 1L), KV.of(8, 1L) ]
PAssert.that(countOfAnimalNamesByLength)
.containsInAnyOrder(
@@ -590,7 +593,7 @@ public void reduceByKeyTestOperatorContext() {
PCollection<String> animals =
pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
- //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+ // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
@@ -603,7 +606,8 @@ public void reduceByKeyTestOperatorContext() {
collector.asContext().getCounter("num-of-keys").increment();
})
.output();
- // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+ // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+ // 1L), KV.of(8, 1L) ]
PAssert.that(countOfAnimalNamesByLength)
.containsInAnyOrder(
@@ -659,7 +663,7 @@ public void reduceByKeyTestOperatorFold() {
PCollection<String> animals =
pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
- //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+ // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
@@ -668,7 +672,8 @@ public void reduceByKeyTestOperatorFold() {
.valueBy(e -> 1L)
.combineBy(Fold.of((l1, l2) -> l1 + l2))
.output();
- // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+ // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+ // 1L), KV.of(8, 1L) ]
PAssert.that(countOfAnimalNamesByLength)
.containsInAnyOrder(
@@ -681,7 +686,7 @@ public void reduceByKeyTestOperatorFold() {
public void testSumByKeyOperator() {
PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8, 9)));
- //suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
+ // suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
PCollection<KV<Integer, Long>> output =
SumByKey.named("sum-odd-and-even")
.of(input)
@@ -707,11 +712,12 @@ public void testUnionOperator() {
.apply("rodents", Create.of("squirrel", "mouse", "rat", "lemming", "beaver"))
.setTypeDescriptor(TypeDescriptors.strings());
- //suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
- //suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
+ // suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
+ // suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals = Union.named("to-animals").of(cats, rodents).output();
- // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
+ // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat",
+ // "lemming", "beaver"
PAssert.that(animals)
.containsInAnyOrder(
"cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver");
@@ -732,7 +738,8 @@ public void testAssignEventTimeOperator() {
new SomeEventObject(3),
new SomeEventObject(4))));
- // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
+ // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
+ // returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
AssignEventTime.named("extract-event-time")
.of(events)
@@ -795,17 +802,18 @@ public void testTopPerKeyOperator() {
"duck",
"caterpillar"));
- // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
+ // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat",
+ // "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
- .scoreBy(
- String
- ::length) // length defines score, note that Integer implements Comparable<Integer>
+ .scoreBy(String::length) // length defines score, note that Integer implements
+ // Comparable<Integer>
.output();
- // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
+ // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant",
+ // 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
PAssert.that(longestNamesByLetter)
.containsInAnyOrder(
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
index 991879d428dd..b183799e070e 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
@@ -17,40 +17,40 @@
*/
package org.apache.beam.sdk.extensions.euphoria.core.testkit;
//
-//import static org.junit.Assert.assertEquals;
-//
-//import java.time.Instant;
-//import java.util.Arrays;
-//import java.util.List;
-//import java.util.Objects;
-//import java.util.concurrent.atomic.AtomicBoolean;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest;
-//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing;
-//import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-//import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-//import org.apache.beam.sdk.values.KV;
-//import org.junit.Test;
-//
-///** Tests capabilities of {@link Windowing}. */
-//@Processing(Processing.Type.ALL)
-//public class WindowingTest extends AbstractOperatorTest {
+// import static org.junit.Assert.assertEquals;
+//
+// import java.time.Instant;
+// import java.util.Arrays;
+// import java.util.List;
+// import java.util.Objects;
+// import java.util.concurrent.atomic.AtomicBoolean;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
+// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest;
+// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing;
+// import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+// import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+// import org.apache.beam.sdk.values.KV;
+// import org.junit.Test;
+//
+/// ** Tests capabilities of {@link Windowing}. */
+// @Processing(Processing.Type.ALL)
+// public class WindowingTest extends AbstractOperatorTest {
//
// static final AtomicBoolean ON_CLEAR_VALIDATED = new AtomicBoolean(false);
//
@@ -270,7 +270,8 @@
// // extract window timestamp
// return FlatMap.of(keyValues)
// .using(
-// (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) -> {
+// (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) ->
+// {
// long windowBegin = ((TimeInterval) out.getWindow()).getStartMillis();
// long windowEnd = ((TimeInterval) out.getWindow()).getEndMillis();
// out.collect(
@@ -485,4 +486,4 @@
// }
// }
// */
-//}
+// }
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
index daff0621a462..1e4d6e827da2 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
@@ -55,18 +55,19 @@ public void twoUsesOneViewTest() {
// create input to be broadcast
PCollection<KV<Integer, String>> lengthStrings =
- p.apply("names",
- Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three")))
- .setTypeDescriptor(
- TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()));
+ p.apply("names", Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three")))
+ .setTypeDescriptor(
+ TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()));
UnaryFunction<KV<Integer, String>, Integer> sharedKeyExtractor = KV::getKey;
// other datasets to be joined with
PCollection<String> letters =
- p.apply("letters", Create.of("a", "b", "c", "d")).setTypeDescriptor(TypeDescriptors.strings());
+ p.apply("letters", Create.of("a", "b", "c", "d"))
+ .setTypeDescriptor(TypeDescriptors.strings());
PCollection<String> acronyms =
- p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD")).setTypeDescriptor(TypeDescriptors.strings());
+ p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD"))
+ .setTypeDescriptor(TypeDescriptors.strings());
PCollection<KV<Integer, String>> lettersJoined =
LeftJoin.named("join-letters-with-lengths")
@@ -88,7 +89,6 @@ public void twoUsesOneViewTest() {
TypeDescriptors.strings())
.output();
-
PAssert.that(lettersJoined)
.containsInAnyOrder(
KV.of(1, "a-one"), KV.of(1, "b-one"), KV.of(1, "c-one"), KV.of(1, "d-one"));
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 6caf76033b2f..0e9127ae7631 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -106,7 +106,7 @@ public static void startDatabase() throws Exception {
derbyServer.ping();
started = true;
} catch (Throwable t) {
- //ignore, still trying to start
+ // ignore, still trying to start
}
}
}
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 76139efe1777..b08a848ddc72 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -360,7 +360,8 @@ public void testCheckpointMarkSafety() throws Exception {
session.close();
connection.close();
- // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in sending
+ // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in
+ // sending
// acknowledgements - this should help uncover threading issues around checkpoint management.
JmsIO.Read spec =
JmsIO.read()
@@ -383,7 +384,8 @@ public void testCheckpointMarkSafety() throws Exception {
// the messages are still pending in the queue (no ACK yet)
assertEquals(messagesToProcess, count(QUEUE));
- // we finalize the checkpoint for the already-processed messages while simultaneously consuming the remainder of
+ // we finalize the checkpoint for the already-processed messages while simultaneously consuming
+ // the remainder of
// messages from the queue
Thread runner =
new Thread(
@@ -399,7 +401,8 @@ public void testCheckpointMarkSafety() throws Exception {
runner.start();
reader.getCheckpointMark().finalizeCheckpoint();
- // Concurrency issues would cause an exception to be thrown before this method exits, failing the test
+ // Concurrency issues would cause an exception to be thrown before this method exits, failing
+ // the test
runner.join();
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 41efb690c33a..bd46f711cf46 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -114,7 +114,7 @@ public void verifyDeterministic() throws NonDeterministicException {
@Override
public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) {
return kvCoder.isRegisterByteSizeObserverCheap(value.getKV());
- //TODO : do we have to implement getEncodedSize()?
+ // TODO : do we have to implement getEncodedSize()?
}
@SuppressWarnings("unchecked")
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 22f595febc69..ee058aa8e226 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -268,8 +268,7 @@ public Instant getWatermark() {
}
// Return minimum watermark among partitions.
- return partitionStates
- .stream()
+ return partitionStates.stream()
.map(PartitionState::updateAndGetWatermark)
.min(Comparator.naturalOrder())
.get();
@@ -279,8 +278,7 @@ public Instant getWatermark() {
public CheckpointMark getCheckpointMark() {
reportBacklog();
return new KafkaCheckpointMark(
- partitionStates
- .stream()
+ partitionStates.stream()
.map(
p ->
new PartitionMark(
@@ -394,7 +392,7 @@ public long getSplitBacklogBytes() {
private static final long UNINITIALIZED_OFFSET = -1;
- //Add SpEL instance to cover the interface difference of Kafka client
+ // Add SpEL instance to cover the interface difference of Kafka client
private transient ConsumerSpEL consumerSpEL;
/** watermark before any records have been read. */
@@ -604,9 +602,7 @@ private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
consumer.commitSync(
- checkpointMark
- .getPartitions()
- .stream()
+ checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
index e1845d3dbc7b..5fbe2727b95f 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -42,8 +42,7 @@
private static List<Long> getTimestampsForRecords(
TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) {
- return timestampOffsets
- .stream()
+ return timestampOffsets.stream()
.map(
ts -> {
Instant result =
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 6d25b42ab62e..63e14a3b2639 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -238,9 +238,7 @@ public synchronized void assign(final Collection<TopicPartition> assigned) {
@Override
public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
Map<TopicPartition, Long> timestampsToSearch) {
- return timestampsToSearch
- .entrySet()
- .stream()
+ return timestampsToSearch.entrySet().stream()
.map(
e -> {
// In test scope, timestamp == offset.
@@ -283,10 +281,11 @@ public void run() {
if (config.get("inject.error.at.eof") != null) {
consumer.setException(new KafkaException("Injected error in consumer.poll()"));
}
- // MockConsumer.poll(timeout) does not actually wait even when there aren't any records.
+ // MockConsumer.poll(timeout) does not actually wait even when there aren't any
+ // records.
// Add a small wait here in order to avoid busy looping in the reader.
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
- //TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs
+ // TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs
// without this wait. Need to look into it.
}
consumer.schedulePollTask(this);
@@ -1583,11 +1582,13 @@ private static void verifyProducerRecords(
producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
mockProducer =
new MockProducer<Integer, Long>(
- false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
+ false, // disable synchronous completion of send. see ProducerSendCompletionThread
+ // below.
new IntegerSerializer(),
new LongSerializer()) {
- // override flush() so that it does not complete all the waiting sends, giving a chance to
+ // override flush() so that it does not complete all the waiting sends, giving a chance
+ // to
// ProducerCompletionThread to inject errors.
@Override
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 1bb971e5e7ee..362f5d393808 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -62,8 +62,7 @@ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
shardsAtStartingPoint,
startingPoint.getTimestamp());
return new KinesisReaderCheckpoint(
- shardsAtStartingPoint
- .stream()
+ shardsAtStartingPoint.stream()
.map(shard -> new ShardCheckpoint(streamName, shard.getShardId(), startingPoint))
.collect(Collectors.toList()));
}
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 8a9afd844c54..6fefb43dee0f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -35,8 +35,7 @@ public GetKinesisRecordsResult(
final String streamName,
final String shardId) {
this.records =
- records
- .stream()
+ records.stream()
.map(
input -> {
assert input != null; // to make FindBugs happy
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index 6b5d841b0ff9..3f41639834af 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -215,9 +215,7 @@ boolean allShardsUpToDate() {
KinesisReaderCheckpoint getCheckpointMark() {
ImmutableMap<String, ShardRecordsIterator> currentShardIterators = shardIteratorsMap.get();
return new KinesisReaderCheckpoint(
- currentShardIterators
- .values()
- .stream()
+ currentShardIterators.values().stream()
.map(
shardRecordsIterator -> {
checkArgument(
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 5e3597d969dd..4749d2f9797a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -159,8 +159,7 @@ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
@Override
public AmazonKinesis getKinesisClient() {
return new AmazonKinesisMock(
- shardedData
- .stream()
+ shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
.collect(Collectors.toList()),
numberOfRecordsPerGet);
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
index 69394bc858c5..126b70682bc1 100644
--- a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
@@ -322,9 +322,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
} else {
Stream<BoundedSource<T>> sources =
- spec.getKuduService()
- .createTabletScanners(spec)
- .stream()
+ spec.getKuduService().createTabletScanners(spec).stream()
.map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));
return sources.collect(Collectors.toList());
}
diff --git a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
index d403c38fdcdf..13b92406ad16 100644
--- a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
+++ b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
@@ -103,8 +103,7 @@ public void testReadQueue() throws Exception {
new String(message.getBody(), StandardCharsets.UTF_8)));
List<String> records =
- generateRecords(maxNumRecords)
- .stream()
+ generateRecords(maxNumRecords).stream()
.map(record -> new String(record, StandardCharsets.UTF_8))
.collect(Collectors.toList());
PAssert.that(output).containsInAnyOrder(records);
@@ -149,8 +148,7 @@ public void testReadExchange() throws Exception {
new String(message.getBody(), StandardCharsets.UTF_8)));
List<String> records =
- generateRecords(maxNumRecords)
- .stream()
+ generateRecords(maxNumRecords).stream()
.map(record -> new String(record, StandardCharsets.UTF_8))
.collect(Collectors.toList());
PAssert.that(output).containsInAnyOrder(records);
@@ -201,8 +199,7 @@ public void testReadExchange() throws Exception {
public void testWriteQueue() throws Exception {
final int maxNumRecords = 1000;
List<RabbitMqMessage> data =
- generateRecords(maxNumRecords)
- .stream()
+ generateRecords(maxNumRecords).stream()
.map(bytes -> new RabbitMqMessage(bytes))
.collect(Collectors.toList());
p.apply(Create.of(data))
@@ -245,8 +242,7 @@ public void testWriteQueue() throws Exception {
public void testWriteExchange() throws Exception {
final int maxNumRecords = 1000;
List<RabbitMqMessage> data =
- generateRecords(maxNumRecords)
- .stream()
+ generateRecords(maxNumRecords).stream()
.map(bytes -> new RabbitMqMessage(bytes))
.collect(Collectors.toList());
p.apply(Create.of(data))
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
index b707f6a395ca..eae04bc5ee5e 100644
--- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
@@ -143,8 +143,7 @@ public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) {
: sourceOptions.forceNumInitialBundles;
List<SyntheticBoundedSource> res =
- bundleSplitter
- .getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset())
+ bundleSplitter.getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset())
.stream()
.map(offsetRange -> createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()))
.collect(Collectors.toList());
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
index a76b49389c3e..f9b4db2bcd70 100644
--- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
@@ -99,9 +99,7 @@ public void validate() {
: desiredNumSplits;
List<SyntheticUnboundedSource> splits =
- bundleSplitter
- .getBundleSizes(desiredNumBundles, startOffset, endOffset)
- .stream()
+ bundleSplitter.getBundleSizes(desiredNumBundles, startOffset, endOffset).stream()
.map(
offsetRange ->
new SyntheticUnboundedSource(
diff --git a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
index a5726618eb0e..1a30ee4a8118 100644
--- a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
+++ b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
@@ -64,8 +64,7 @@ public void bundlesShouldBeEvenForConstDistribution() {
List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords);
- bundleSizes
- .stream()
+ bundleSizes.stream()
.map(range -> range.getTo() - range.getFrom())
.forEach(size -> assertEquals(expectedBundleSize, size.intValue()));
}
@@ -79,8 +78,7 @@ public void bundleSizesShouldBeProportionalToTheOneSuggestedInBundleSizeDistribu
List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords);
- bundleSizes
- .stream()
+ bundleSizes.stream()
.map(range -> range.getTo() - range.getFrom())
.forEach(size -> assertEquals(expectedBundleSize, size.intValue()));
}
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
index 3c0309d3c0e1..0075190cbba9 100644
--- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
@@ -222,7 +222,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
Metadata metadata = getInputMetadata();
if (metadata != null) {
- //TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released
+ // TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released
builder.add(
DisplayData.item("inputMetadata", metadata.toString().trim())
.withLabel("Input Metadata"));
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index a6f5505aa89a..1d0e0191e9b7 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -715,9 +715,7 @@ public static void cleanUpSideInput(NexmarkConfiguration config) throws IOExcept
break;
case CSV:
FileSystems.delete(
- FileSystems.match(config.sideInputUrl + "*")
- .metadata()
- .stream()
+ FileSystems.match(config.sideInputUrl + "*").metadata().stream()
.map(metadata -> metadata.resourceId())
.collect(Collectors.toList()));
break;
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
index c5a339580efe..0c37e1555a43 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
@@ -142,9 +142,7 @@ public TopicPath reuseTopic(String shortTopic) throws IOException {
/** Does topic corresponding to short name exist? */
public boolean topicExists(String shortTopic) throws IOException {
TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
- return pubsubClient
- .listTopics(PubsubClient.projectPathFromId(project))
- .stream()
+ return pubsubClient.listTopics(PubsubClient.projectPathFromId(project)).stream()
.anyMatch(topic::equals);
}
@@ -199,9 +197,7 @@ public boolean subscriptionExists(String shortTopic, String shortSubscription)
TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
SubscriptionPath subscription =
PubsubClient.subscriptionPathFromName(project, shortSubscription);
- return pubsubClient
- .listSubscriptions(PubsubClient.projectPathFromId(project), topic)
- .stream()
+ return pubsubClient.listSubscriptions(PubsubClient.projectPathFromId(project), topic).stream()
.anyMatch(subscription::equals);
}
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
index a4ded8d77ce5..89b0cc6c101f 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
@@ -118,7 +118,7 @@ public void setMaxNumWorkers(int maxNumWorkers) {
/** Return channel for writing bytes to GCS. */
private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
throws IOException {
- //TODO
+ // TODO
// Fix after PR: right now this is a specific Google added use case
// Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
index f353087b4461..05d7bf3990c7 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
@@ -247,7 +247,7 @@ public void processElement(
}
// Remember this person for any future auctions.
personState.write(newPerson);
- //set a time out to clear this state
+ // set a time out to clear this state
Instant firingTime =
new Instant(newPerson.dateTime).plus(Duration.standardSeconds(maxAuctionsWaitingTime));
timer.set(firingTime);
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
index 688d4dcc19af..b3271415fe62 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
@@ -101,16 +101,14 @@ private void flushSession(long bidder) {
Instant sessionStart =
Ordering.<Instant>natural()
.min(
- session
- .stream()
+ session.stream()
.<Instant>map(tsv -> tsv.getTimestamp())
.collect(Collectors.toList()));
Instant sessionEnd =
Ordering.<Instant>natural()
.max(
- session
- .stream()
+ session.stream()
.<Instant>map(tsv -> tsv.getTimestamp())
.collect(Collectors.toList()))
.plus(configuration.sessionGap);
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
index 42fd23d77108..396406ac8cb3 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
@@ -79,9 +79,7 @@ public void createTableIfNotExists(String tableName, Map<String, String> schema)
if (client.getTable(tableId, FIELD_OPTIONS) == null) {
List<Field> schemaFields =
- schema
- .entrySet()
- .stream()
+ schema.entrySet().stream()
.map(entry -> Field.of(entry.getKey(), LegacySQLTypeName.valueOf(entry.getValue())))
.collect(Collectors.toList());
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
index e7203e3b92bd..7bee2f123490 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
@@ -50,10 +50,7 @@ public void publish(TestResult result, String tableName) {
}
private Map<String, Object> getRowOfSchema(TestResult result) {
- return result
- .toMap()
- .entrySet()
- .stream()
+ return result.toMap().entrySet().stream()
.filter(element -> schema.containsKey(element.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
With regards,
Apache Git Services