You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 23:38:06 UTC

[beam] Diff for: [GitHub] kennknowles merged pull request #7505: [BEAM-5009] Pin spotless and googleJavaFormat to latest; apply globally

diff --git a/build.gradle b/build.gradle
index f8d8ef2e9605..bb9de48e4f32 100644
--- a/build.gradle
+++ b/build.gradle
@@ -54,7 +54,7 @@ buildscript {
     classpath "io.spring.gradle:propdeps-plugin:0.0.9.RELEASE"                                          // Enable provided and optional configurations
     classpath "gradle.plugin.org.nosphere.apache:creadur-rat-gradle:0.3.1"                              // Enable Apache license enforcement
     classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.11.0"                                 // Enable Avro code generation
-    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.7.0"                                      // Enable a code formatting plugin
+    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0"                                     // Enable a code formatting plugin
     classpath "gradle.plugin.com.github.blindpirate:gogradle:0.10"                                      // Enable Go code compilation
     classpath "gradle.plugin.com.palantir.gradle.docker:gradle-docker:0.20.1"                           // Enable building Docker containers
     classpath "cz.malohlava:visteg:1.0.3"                                                               // Enable generating Gradle task dependencies as ".dot" files
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 126b4c97dd2f..1d7ba4baba9a 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -727,7 +727,7 @@ class BeamModulePlugin implements Plugin<Project> {
         enforceCheck !disableSpotlessCheck
         java {
           licenseHeader javaLicenseHeader
-          googleJavaFormat()
+          googleJavaFormat('1.7')
 
           // Details see: https://github.com/diffplug/spotless/blob/master/PADDEDCELL.md
           paddedCell()
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
index 1cb8d34275c4..942e8f35a0a5 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Distinct.java
@@ -62,13 +62,12 @@
  */
 @Audience(Audience.Type.CLIENT)
 @Recommended(
-  reason =
-      "Might be useful to override the default "
-          + "implementation because of performance reasons"
-          + "(e.g. using bloom filters), which might reduce the space complexity",
-  state = StateComplexity.CONSTANT,
-  repartitions = 1
-)
+    reason =
+        "Might be useful to override the default "
+            + "implementation because of performance reasons"
+            + "(e.g. using bloom filters), which might reduce the space complexity",
+    state = StateComplexity.CONSTANT,
+    repartitions = 1)
 public class Distinct<InputT, OutputT> extends ShuffleOperator<InputT, OutputT, OutputT>
     implements CompositeOperator<InputT, OutputT> {
 
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
index ddd9eb023a98..e90d2441d76d 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java
@@ -71,13 +71,12 @@
  */
 @Audience(Audience.Type.CLIENT)
 @Recommended(
-  reason =
-      "Might be useful to override because of performance reasons in a "
-          + "specific join types (e.g. sort join), which might reduce the space "
-          + "complexity",
-  state = StateComplexity.LINEAR,
-  repartitions = 1
-)
+    reason =
+        "Might be useful to override because of performance reasons in a "
+            + "specific join types (e.g. sort join), which might reduce the space "
+            + "complexity",
+    state = StateComplexity.LINEAR,
+    repartitions = 1)
 public class Join<LeftT, RightT, KeyT, OutputT>
     extends ShuffleOperator<Object, KeyT, KV<KeyT, OutputT>> {
 
diff --git a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
index af2a518cbf76..6b1eb42789f2 100644
--- a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
+++ b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/ReduceByKey.java
@@ -87,13 +87,12 @@
  */
 @Audience(Audience.Type.CLIENT)
 @Recommended(
-  reason =
-      "Is very recommended to override because of performance in "
-          + "a specific area of (mostly) batch calculations where combiners "
-          + "can be efficiently used in the executor-specific implementation",
-  state = StateComplexity.CONSTANT_IF_COMBINABLE,
-  repartitions = 1
-)
+    reason =
+        "Is very recommended to override because of performance in "
+            + "a specific area of (mostly) batch calculations where combiners "
+            + "can be efficiently used in the executor-specific implementation",
+    state = StateComplexity.CONSTANT_IF_COMBINABLE,
+    repartitions = 1)
 public class ReduceByKey<InputT, KeyT, ValueT, OutputT>
     extends ShuffleOperator<InputT, KeyT, KV<KeyT, OutputT>> implements TypeAware.Value<ValueT> {
 
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
index cf8f89611c92..4b2eb8c774ea 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
@@ -207,15 +207,15 @@ public void codersAndTypesSection() {
     final PipelineOptions options = PipelineOptionsFactory.create();
     Pipeline pipeline = Pipeline.create(options);
 
-    //Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
+    // Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
     KryoCoderProvider.of().registerTo(pipeline);
 
-    //Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
+    // Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
     options.as(KryoOptions.class).setKryoRegistrationRequired(true);
 
     KryoCoderProvider.of(
-            kryo -> { //KryoRegistrar of your uwn
-              kryo.register(KryoSerializedElementType.class); //other may follow
+            kryo -> { // KryoRegistrar of your uwn
+              kryo.register(KryoSerializedElementType.class); // other may follow
             })
         .registerTo(pipeline);
 
@@ -510,14 +510,15 @@ public void flatMapWithTimeExtractorOperator() {
                 new SomeEventObject(3),
                 new SomeEventObject(4)));
 
-    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
+    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
+    // returns time-stamp
     PCollection<SomeEventObject> timeStampedEvents =
         FlatMap.named("extract-event-time")
             .of(events)
             .using((SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
             .eventTimeBy(SomeEventObject::getEventTimeInMillis)
             .output();
-    //Euphoria will now know event time for each event
+    // Euphoria will now know event time for each event
 
     pipeline.run();
   }
@@ -530,7 +531,7 @@ public void filterOperator() {
     // suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
     PCollection<Integer> divisibleBythree =
         Filter.named("divisibleByFive").of(nums).by(e -> e % 3 == 0).output();
-    //divisibleBythree will contain: [ 0, 3, 6, 9]
+    // divisibleBythree will contain: [ 0, 3, 6, 9]
 
     PAssert.that(divisibleBythree).containsInAnyOrder(0, 3, 6, 9);
     pipeline.run();
@@ -542,7 +543,7 @@ public void reduceByKeyTestOperator1() {
     PCollection<String> animals =
         pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
 
-    //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
     PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
         ReduceByKey.named("to-letters-couts")
             .of(animals)
@@ -551,7 +552,8 @@ public void reduceByKeyTestOperator1() {
             .valueBy(e -> 1)
             .reduceBy(Stream::count)
             .output();
-    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+    // 1L), KV.of(8, 1L) ]
 
     PAssert.that(countOfAnimalNamesByLength)
         .containsInAnyOrder(
@@ -566,7 +568,7 @@ public void reduceByKeyTestOperatorCombinable() {
     PCollection<String> animals =
         pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
 
-    //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
     PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
         ReduceByKey.named("to-letters-couts")
             .of(animals)
@@ -575,7 +577,8 @@ public void reduceByKeyTestOperatorCombinable() {
             .valueBy(e -> 1L)
             .combineBy(s -> s.mapToLong(l -> l).sum())
             .output();
-    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+    // 1L), KV.of(8, 1L) ]
 
     PAssert.that(countOfAnimalNamesByLength)
         .containsInAnyOrder(
@@ -590,7 +593,7 @@ public void reduceByKeyTestOperatorContext() {
     PCollection<String> animals =
         pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
 
-    //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
     PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
         ReduceByKey.named("to-letters-couts")
             .of(animals)
@@ -603,7 +606,8 @@ public void reduceByKeyTestOperatorContext() {
                   collector.asContext().getCounter("num-of-keys").increment();
                 })
             .output();
-    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+    // countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+    // 1L), KV.of(8, 1L) ]
 
     PAssert.that(countOfAnimalNamesByLength)
         .containsInAnyOrder(
@@ -659,7 +663,7 @@ public void reduceByKeyTestOperatorFold() {
     PCollection<String> animals =
         pipeline.apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"));
 
-    //suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
+    // suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
     PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
         ReduceByKey.named("to-letters-couts")
             .of(animals)
@@ -668,7 +672,8 @@ public void reduceByKeyTestOperatorFold() {
             .valueBy(e -> 1L)
             .combineBy(Fold.of((l1, l2) -> l1 + l2))
             .output();
-    // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
+    // countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5,
+    // 1L), KV.of(8, 1L) ]
 
     PAssert.that(countOfAnimalNamesByLength)
         .containsInAnyOrder(
@@ -681,7 +686,7 @@ public void reduceByKeyTestOperatorFold() {
   public void testSumByKeyOperator() {
     PCollection<Integer> input = pipeline.apply(Create.of(asList(1, 2, 3, 4, 5, 6, 7, 8, 9)));
 
-    //suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
+    // suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
     PCollection<KV<Integer, Long>> output =
         SumByKey.named("sum-odd-and-even")
             .of(input)
@@ -707,11 +712,12 @@ public void testUnionOperator() {
             .apply("rodents", Create.of("squirrel", "mouse", "rat", "lemming", "beaver"))
             .setTypeDescriptor(TypeDescriptors.strings());
 
-    //suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
-    //suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
+    // suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
+    // suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
     PCollection<String> animals = Union.named("to-animals").of(cats, rodents).output();
 
-    // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
+    // animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat",
+    // "lemming", "beaver"
     PAssert.that(animals)
         .containsInAnyOrder(
             "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver");
@@ -732,7 +738,8 @@ public void testAssignEventTimeOperator() {
                     new SomeEventObject(3),
                     new SomeEventObject(4))));
 
-    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
+    // suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods
+    // returns time-stamp
     PCollection<SomeEventObject> timeStampedEvents =
         AssignEventTime.named("extract-event-time")
             .of(events)
@@ -795,17 +802,18 @@ public void testTopPerKeyOperator() {
                 "duck",
                 "caterpillar"));
 
-    // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
+    // suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat",
+    // "duck", "caterpillar" ]
     PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
         TopPerKey.named("longest-animal-names")
             .of(animals)
             .keyBy(name -> name.charAt(0)) // first character is the key
             .valueBy(UnaryFunction.identity()) // value type is the same as input element type
-            .scoreBy(
-                String
-                    ::length) // length defines score, note that Integer implements Comparable<Integer>
+            .scoreBy(String::length) // length defines score, note that Integer implements
+            // Comparable<Integer>
             .output();
-    // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
+    // longestNamesByLetter will contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant",
+    // 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
 
     PAssert.that(longestNamesByLetter)
         .containsInAnyOrder(
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
index 991879d428dd..b183799e070e 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/WindowingTest.java
@@ -17,40 +17,40 @@
  */
 package org.apache.beam.sdk.extensions.euphoria.core.testkit;
 //
-//import static org.junit.Assert.assertEquals;
-//
-//import java.time.Instant;
-//import java.util.Arrays;
-//import java.util.List;
-//import java.util.Objects;
-//import java.util.concurrent.atomic.AtomicBoolean;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
-//import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest;
-//import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing;
-//import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-//import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-//import org.apache.beam.sdk.values.KV;
-//import org.junit.Test;
-//
-///** Tests capabilities of {@link Windowing}. */
-//@Processing(Processing.Type.ALL)
-//public class WindowingTest extends AbstractOperatorTest {
+// import static org.junit.Assert.assertEquals;
+//
+// import java.time.Instant;
+// import java.util.Arrays;
+// import java.util.List;
+// import java.util.Objects;
+// import java.util.concurrent.atomic.AtomicBoolean;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
+// import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
+// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.AbstractOperatorTest;
+// import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.Processing;
+// import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+// import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+// import org.apache.beam.sdk.values.KV;
+// import org.junit.Test;
+//
+/// ** Tests capabilities of {@link Windowing}. */
+// @Processing(Processing.Type.ALL)
+// public class WindowingTest extends AbstractOperatorTest {
 //
 //  static final AtomicBoolean ON_CLEAR_VALIDATED = new AtomicBoolean(false);
 //
@@ -270,7 +270,8 @@
 //            // extract window timestamp
 //            return FlatMap.of(keyValues)
 //                .using(
-//                    (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) -> {
+//                    (KV<String, Integer> in, Collector<Triple<Instant, Instant, Integer>> out) ->
+// {
 //                      long windowBegin = ((TimeInterval) out.getWindow()).getStartMillis();
 //                      long windowEnd = ((TimeInterval) out.getWindow()).getEndMillis();
 //                      out.collect(
@@ -485,4 +486,4 @@
 //    }
 //  }
 //  */
-//}
+// }
diff --git a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
index daff0621a462..1e4d6e827da2 100644
--- a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
+++ b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BroadcastHashJoinTranslatorTest.java
@@ -55,18 +55,19 @@ public void twoUsesOneViewTest() {
 
     // create input to be broadcast
     PCollection<KV<Integer, String>> lengthStrings =
-        p.apply("names",
-            Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three")))
-                .setTypeDescriptor(
-                    TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()));
+        p.apply("names", Create.of(KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three")))
+            .setTypeDescriptor(
+                TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()));
 
     UnaryFunction<KV<Integer, String>, Integer> sharedKeyExtractor = KV::getKey;
 
     // other datasets to be joined with
     PCollection<String> letters =
-        p.apply("letters", Create.of("a", "b", "c", "d")).setTypeDescriptor(TypeDescriptors.strings());
+        p.apply("letters", Create.of("a", "b", "c", "d"))
+            .setTypeDescriptor(TypeDescriptors.strings());
     PCollection<String> acronyms =
-        p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD")).setTypeDescriptor(TypeDescriptors.strings());
+        p.apply("acronyms", Create.of("B2K", "DIY", "FKA", "EOBD"))
+            .setTypeDescriptor(TypeDescriptors.strings());
 
     PCollection<KV<Integer, String>> lettersJoined =
         LeftJoin.named("join-letters-with-lengths")
@@ -88,7 +89,6 @@ public void twoUsesOneViewTest() {
                 TypeDescriptors.strings())
             .output();
 
-
     PAssert.that(lettersJoined)
         .containsInAnyOrder(
             KV.of(1, "a-one"), KV.of(1, "b-one"), KV.of(1, "c-one"), KV.of(1, "d-one"));
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 6caf76033b2f..0e9127ae7631 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -106,7 +106,7 @@ public static void startDatabase() throws Exception {
           derbyServer.ping();
           started = true;
         } catch (Throwable t) {
-          //ignore, still trying to start
+          // ignore, still trying to start
         }
       }
     }
diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 76139efe1777..b08a848ddc72 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -360,7 +360,8 @@ public void testCheckpointMarkSafety() throws Exception {
     session.close();
     connection.close();
 
-    // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in sending
+    // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in
+    // sending
     // acknowledgements - this should help uncover threading issues around checkpoint management.
     JmsIO.Read spec =
         JmsIO.read()
@@ -383,7 +384,8 @@ public void testCheckpointMarkSafety() throws Exception {
     // the messages are still pending in the queue (no ACK yet)
     assertEquals(messagesToProcess, count(QUEUE));
 
-    // we finalize the checkpoint for the already-processed messages while simultaneously consuming the remainder of
+    // we finalize the checkpoint for the already-processed messages while simultaneously consuming
+    // the remainder of
     // messages from the queue
     Thread runner =
         new Thread(
@@ -399,7 +401,8 @@ public void testCheckpointMarkSafety() throws Exception {
     runner.start();
     reader.getCheckpointMark().finalizeCheckpoint();
 
-    // Concurrency issues would cause an exception to be thrown before this method exits, failing the test
+    // Concurrency issues would cause an exception to be thrown before this method exits, failing
+    // the test
     runner.join();
   }
 
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 41efb690c33a..bd46f711cf46 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -114,7 +114,7 @@ public void verifyDeterministic() throws NonDeterministicException {
   @Override
   public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) {
     return kvCoder.isRegisterByteSizeObserverCheap(value.getKV());
-    //TODO : do we have to implement getEncodedSize()?
+    // TODO : do we have to implement getEncodedSize()?
   }
 
   @SuppressWarnings("unchecked")
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 22f595febc69..ee058aa8e226 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -268,8 +268,7 @@ public Instant getWatermark() {
     }
 
     // Return minimum watermark among partitions.
-    return partitionStates
-        .stream()
+    return partitionStates.stream()
         .map(PartitionState::updateAndGetWatermark)
         .min(Comparator.naturalOrder())
         .get();
@@ -279,8 +278,7 @@ public Instant getWatermark() {
   public CheckpointMark getCheckpointMark() {
     reportBacklog();
     return new KafkaCheckpointMark(
-        partitionStates
-            .stream()
+        partitionStates.stream()
             .map(
                 p ->
                     new PartitionMark(
@@ -394,7 +392,7 @@ public long getSplitBacklogBytes() {
 
   private static final long UNINITIALIZED_OFFSET = -1;
 
-  //Add SpEL instance to cover the interface difference of Kafka client
+  // Add SpEL instance to cover the interface difference of Kafka client
   private transient ConsumerSpEL consumerSpEL;
 
   /** watermark before any records have been read. */
@@ -604,9 +602,7 @@ private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
     LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
 
     consumer.commitSync(
-        checkpointMark
-            .getPartitions()
-            .stream()
+        checkpointMark.getPartitions().stream()
             .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
             .collect(
                 Collectors.toMap(
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
index e1845d3dbc7b..5fbe2727b95f 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelayTest.java
@@ -42,8 +42,7 @@
   private static List<Long> getTimestampsForRecords(
       TimestampPolicy<String, String> policy, Instant now, List<Long> timestampOffsets) {
 
-    return timestampOffsets
-        .stream()
+    return timestampOffsets.stream()
         .map(
             ts -> {
               Instant result =
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 6d25b42ab62e..63e14a3b2639 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -238,9 +238,7 @@ public synchronized void assign(final Collection<TopicPartition> assigned) {
           @Override
           public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
               Map<TopicPartition, Long> timestampsToSearch) {
-            return timestampsToSearch
-                .entrySet()
-                .stream()
+            return timestampsToSearch.entrySet().stream()
                 .map(
                     e -> {
                       // In test scope, timestamp == offset.
@@ -283,10 +281,11 @@ public void run() {
               if (config.get("inject.error.at.eof") != null) {
                 consumer.setException(new KafkaException("Injected error in consumer.poll()"));
               }
-              // MockConsumer.poll(timeout) does not actually wait even when there aren't any records.
+              // MockConsumer.poll(timeout) does not actually wait even when there aren't any
+              // records.
               // Add a small wait here in order to avoid busy looping in the reader.
               Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-              //TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs
+              // TODO: BEAM-4086: testUnboundedSourceWithoutBoundedWrapper() occasionally hangs
               //     without this wait. Need to look into it.
             }
             consumer.schedulePollTask(this);
@@ -1583,11 +1582,13 @@ private static void verifyProducerRecords(
       producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
       mockProducer =
           new MockProducer<Integer, Long>(
-              false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
+              false, // disable synchronous completion of send. see ProducerSendCompletionThread
+              // below.
               new IntegerSerializer(),
               new LongSerializer()) {
 
-            // override flush() so that it does not complete all the waiting sends, giving a chance to
+            // override flush() so that it does not complete all the waiting sends, giving a chance
+            // to
             // ProducerCompletionThread to inject errors.
 
             @Override
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 1bb971e5e7ee..362f5d393808 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -62,8 +62,7 @@ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
         shardsAtStartingPoint,
         startingPoint.getTimestamp());
     return new KinesisReaderCheckpoint(
-        shardsAtStartingPoint
-            .stream()
+        shardsAtStartingPoint.stream()
             .map(shard -> new ShardCheckpoint(streamName, shard.getShardId(), startingPoint))
             .collect(Collectors.toList()));
   }
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 8a9afd844c54..6fefb43dee0f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -35,8 +35,7 @@ public GetKinesisRecordsResult(
       final String streamName,
       final String shardId) {
     this.records =
-        records
-            .stream()
+        records.stream()
             .map(
                 input -> {
                   assert input != null; // to make FindBugs happy
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index 6b5d841b0ff9..3f41639834af 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -215,9 +215,7 @@ boolean allShardsUpToDate() {
   KinesisReaderCheckpoint getCheckpointMark() {
     ImmutableMap<String, ShardRecordsIterator> currentShardIterators = shardIteratorsMap.get();
     return new KinesisReaderCheckpoint(
-        currentShardIterators
-            .values()
-            .stream()
+        currentShardIterators.values().stream()
             .map(
                 shardRecordsIterator -> {
                   checkArgument(
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 5e3597d969dd..4749d2f9797a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -159,8 +159,7 @@ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
     @Override
     public AmazonKinesis getKinesisClient() {
       return new AmazonKinesisMock(
-          shardedData
-              .stream()
+          shardedData.stream()
               .map(testDatas -> transform(testDatas, TestData::convertToRecord))
               .collect(Collectors.toList()),
           numberOfRecordsPerGet);
diff --git a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
index 69394bc858c5..126b70682bc1 100644
--- a/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
+++ b/sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
@@ -322,9 +322,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
       } else {
         Stream<BoundedSource<T>> sources =
-            spec.getKuduService()
-                .createTabletScanners(spec)
-                .stream()
+            spec.getKuduService().createTabletScanners(spec).stream()
                 .map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));
         return sources.collect(Collectors.toList());
       }
diff --git a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
index d403c38fdcdf..13b92406ad16 100644
--- a/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
+++ b/sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
@@ -103,8 +103,7 @@ public void testReadQueue() throws Exception {
                         new String(message.getBody(), StandardCharsets.UTF_8)));
 
     List<String> records =
-        generateRecords(maxNumRecords)
-            .stream()
+        generateRecords(maxNumRecords).stream()
             .map(record -> new String(record, StandardCharsets.UTF_8))
             .collect(Collectors.toList());
     PAssert.that(output).containsInAnyOrder(records);
@@ -149,8 +148,7 @@ public void testReadExchange() throws Exception {
                         new String(message.getBody(), StandardCharsets.UTF_8)));
 
     List<String> records =
-        generateRecords(maxNumRecords)
-            .stream()
+        generateRecords(maxNumRecords).stream()
             .map(record -> new String(record, StandardCharsets.UTF_8))
             .collect(Collectors.toList());
     PAssert.that(output).containsInAnyOrder(records);
@@ -201,8 +199,7 @@ public void testReadExchange() throws Exception {
   public void testWriteQueue() throws Exception {
     final int maxNumRecords = 1000;
     List<RabbitMqMessage> data =
-        generateRecords(maxNumRecords)
-            .stream()
+        generateRecords(maxNumRecords).stream()
             .map(bytes -> new RabbitMqMessage(bytes))
             .collect(Collectors.toList());
     p.apply(Create.of(data))
@@ -245,8 +242,7 @@ public void testWriteQueue() throws Exception {
   public void testWriteExchange() throws Exception {
     final int maxNumRecords = 1000;
     List<RabbitMqMessage> data =
-        generateRecords(maxNumRecords)
-            .stream()
+        generateRecords(maxNumRecords).stream()
             .map(bytes -> new RabbitMqMessage(bytes))
             .collect(Collectors.toList());
     p.apply(Create.of(data))
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
index b707f6a395ca..eae04bc5ee5e 100644
--- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedSource.java
@@ -143,8 +143,7 @@ public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) {
             : sourceOptions.forceNumInitialBundles;
 
     List<SyntheticBoundedSource> res =
-        bundleSplitter
-            .getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset())
+        bundleSplitter.getBundleSizes(desiredNumBundles, this.getStartOffset(), this.getEndOffset())
             .stream()
             .map(offsetRange -> createSourceForSubrange(offsetRange.getFrom(), offsetRange.getTo()))
             .collect(Collectors.toList());
diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
index a76b49389c3e..f9b4db2bcd70 100644
--- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
+++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUnboundedSource.java
@@ -99,9 +99,7 @@ public void validate() {
             : desiredNumSplits;
 
     List<SyntheticUnboundedSource> splits =
-        bundleSplitter
-            .getBundleSizes(desiredNumBundles, startOffset, endOffset)
-            .stream()
+        bundleSplitter.getBundleSizes(desiredNumBundles, startOffset, endOffset).stream()
             .map(
                 offsetRange ->
                     new SyntheticUnboundedSource(
diff --git a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
index a5726618eb0e..1a30ee4a8118 100644
--- a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
+++ b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/BundleSplitterTest.java
@@ -64,8 +64,7 @@ public void bundlesShouldBeEvenForConstDistribution() {
 
     List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords);
 
-    bundleSizes
-        .stream()
+    bundleSizes.stream()
         .map(range -> range.getTo() - range.getFrom())
         .forEach(size -> assertEquals(expectedBundleSize, size.intValue()));
   }
@@ -79,8 +78,7 @@ public void bundleSizesShouldBeProportionalToTheOneSuggestedInBundleSizeDistribu
 
     List<OffsetRange> bundleSizes = splitter.getBundleSizes(4, 0, options.numRecords);
 
-    bundleSizes
-        .stream()
+    bundleSizes.stream()
         .map(range -> range.getTo() - range.getFrom())
         .forEach(size -> assertEquals(expectedBundleSize, size.intValue()));
   }
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
index 3c0309d3c0e1..0075190cbba9 100644
--- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
@@ -222,7 +222,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
       }
       Metadata metadata = getInputMetadata();
       if (metadata != null) {
-        //TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released
+        // TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released
         builder.add(
             DisplayData.item("inputMetadata", metadata.toString().trim())
                 .withLabel("Input Metadata"));
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index a6f5505aa89a..1d0e0191e9b7 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -715,9 +715,7 @@ public static void cleanUpSideInput(NexmarkConfiguration config) throws IOExcept
         break;
       case CSV:
         FileSystems.delete(
-            FileSystems.match(config.sideInputUrl + "*")
-                .metadata()
-                .stream()
+            FileSystems.match(config.sideInputUrl + "*").metadata().stream()
                 .map(metadata -> metadata.resourceId())
                 .collect(Collectors.toList()));
         break;
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
index c5a339580efe..0c37e1555a43 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/PubsubHelper.java
@@ -142,9 +142,7 @@ public TopicPath reuseTopic(String shortTopic) throws IOException {
   /** Does topic corresponding to short name exist? */
   public boolean topicExists(String shortTopic) throws IOException {
     TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
-    return pubsubClient
-        .listTopics(PubsubClient.projectPathFromId(project))
-        .stream()
+    return pubsubClient.listTopics(PubsubClient.projectPathFromId(project)).stream()
         .anyMatch(topic::equals);
   }
 
@@ -199,9 +197,7 @@ public boolean subscriptionExists(String shortTopic, String shortSubscription)
     TopicPath topic = PubsubClient.topicPathFromName(project, shortTopic);
     SubscriptionPath subscription =
         PubsubClient.subscriptionPathFromName(project, shortSubscription);
-    return pubsubClient
-        .listSubscriptions(PubsubClient.projectPathFromId(project), topic)
-        .stream()
+    return pubsubClient.listSubscriptions(PubsubClient.projectPathFromId(project), topic).stream()
         .anyMatch(subscription::equals);
   }
 
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
index a4ded8d77ce5..89b0cc6c101f 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
@@ -118,7 +118,7 @@ public void setMaxNumWorkers(int maxNumWorkers) {
   /** Return channel for writing bytes to GCS. */
   private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
       throws IOException {
-    //TODO
+    // TODO
     // Fix after PR: right now this is a specific Google added use case
     // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
     throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
index f353087b4461..05d7bf3990c7 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
@@ -247,7 +247,7 @@ public void processElement(
         }
         // Remember this person for any future auctions.
         personState.write(newPerson);
-        //set a time out to clear this state
+        // set a time out to clear this state
         Instant firingTime =
             new Instant(newPerson.dateTime).plus(Duration.standardSeconds(maxAuctionsWaitingTime));
         timer.set(firingTime);
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
index 688d4dcc19af..b3271415fe62 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/SessionSideInputJoinModel.java
@@ -101,16 +101,14 @@ private void flushSession(long bidder) {
       Instant sessionStart =
           Ordering.<Instant>natural()
               .min(
-                  session
-                      .stream()
+                  session.stream()
                       .<Instant>map(tsv -> tsv.getTimestamp())
                       .collect(Collectors.toList()));
 
       Instant sessionEnd =
           Ordering.<Instant>natural()
               .max(
-                  session
-                      .stream()
+                  session.stream()
                       .<Instant>map(tsv -> tsv.getTimestamp())
                       .collect(Collectors.toList()))
               .plus(configuration.sessionGap);
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
index 42fd23d77108..396406ac8cb3 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryClient.java
@@ -79,9 +79,7 @@ public void createTableIfNotExists(String tableName, Map<String, String> schema)
 
     if (client.getTable(tableId, FIELD_OPTIONS) == null) {
       List<Field> schemaFields =
-          schema
-              .entrySet()
-              .stream()
+          schema.entrySet().stream()
               .map(entry -> Field.of(entry.getKey(), LegacySQLTypeName.valueOf(entry.getValue())))
               .collect(Collectors.toList());
 
diff --git a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
index e7203e3b92bd..7bee2f123490 100644
--- a/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
+++ b/sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/BigQueryResultsPublisher.java
@@ -50,10 +50,7 @@ public void publish(TestResult result, String tableName) {
   }
 
   private Map<String, Object> getRowOfSchema(TestResult result) {
-    return result
-        .toMap()
-        .entrySet()
-        .stream()
+    return result.toMap().entrySet().stream()
         .filter(element -> schema.containsKey(element.getKey()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }


With regards,
Apache Git Services