You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/16 02:09:46 UTC
[beam] Diff for: [GitHub] reuvenlax merged pull request #7523: Apply
spotless across Beam
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 4a516bb0547a..100869ae903c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -186,7 +186,8 @@ static void runWindowedWordCount(Options options) throws IOException {
pipeline
/* Read from the GCS file. */
.apply(TextIO.read().from(options.getInputFile()))
- // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
+ // Concept #2: Add an element timestamp, using an artificial time just to show
+ // windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
index 6bcae8da28f3..14a11b71b5a8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java
@@ -224,7 +224,8 @@ private void tearDown() {
Transport.getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
@@ -237,7 +238,8 @@ private void tearDown() {
Transport.getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setRootUrl(options.getPubsubRootUrl())
.setApplicationName(options.getAppName())
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 9a3bfc26d5b6..bc6541341b8b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -122,7 +122,7 @@
* and then exits.
*/
public class TriggerExample {
- //Numeric value of fixed window duration, in minutes
+ // Numeric value of fixed window duration, in minutes
public static final int WINDOW_DURATION = 30;
// Constants used in triggers.
// Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
@@ -189,18 +189,22 @@
.apply(
"Default",
Window
- // The default window duration values work well if you're running the default input
+ // The default window duration values work well if you're running the default
+ // input
// file. You may want to adjust the window duration otherwise.
.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(windowDuration)))
- // The default trigger first emits output when the system's watermark passes the end
+ // The default trigger first emits output when the system's watermark passes
+ // the end
// of the window.
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
// Late data is dropped
.withAllowedLateness(Duration.ZERO)
// Discard elements after emitting each pane.
- // With no allowed lateness and the specified trigger there will only be a single
- // pane, so this doesn't have a noticeable effect. See concept 2 for more details.
+ // With no allowed lateness and the specified trigger there will only be a
+ // single
+ // pane, so this doesn't have a noticeable effect. See concept 2 for more
+ // details.
.discardingFiredPanes())
.apply(new TotalFlow("default"));
@@ -229,7 +233,8 @@
FixedWindows.of(Duration.standardMinutes(windowDuration)))
// Late data is emitted as it arrives
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
- // Once the output is produced, the pane is dropped and we start preparing the next
+ // Once the output is produced, the pane is dropped and we start preparing the
+ // next
// pane for the window
.discardingFiredPanes()
// Late data is handled up to one day
@@ -264,8 +269,10 @@
AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE)))
- // After emitting each pane, it will continue accumulating the elements so that each
- // approximation includes all of the previous data in addition to the newly arrived
+ // After emitting each pane, it will continue accumulating the elements so
+ // that each
+ // approximation includes all of the previous data in addition to the newly
+ // arrived
// data.
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
@@ -414,7 +421,7 @@ public void processElement(ProcessContext c) throws Exception {
return;
}
if (laneInfo.length < VALID_NUM_FIELDS) {
- //Skip the invalid input.
+ // Skip the invalid input.
return;
}
String freeway = laneInfo[2];
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 6dd112f5e2d7..962d5fae134e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -129,14 +129,16 @@ public void testTeamScoresSpeculative() {
.addElements(
event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)),
event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)))
- // Some time passes within the runner, which causes a speculative pane containing the blue
+ // Some time passes within the runner, which causes a speculative pane containing the
+ // blue
// team's score to be emitted
.advanceProcessingTime(Duration.standardMinutes(10))
.addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3)))
// Some additional time passes and we get a speculative pane for the red team
.advanceProcessingTime(Duration.standardMinutes(12))
.addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22)))
- // More time passes and a speculative pane containing a refined value for the blue pane is
+ // More time passes and a speculative pane containing a refined value for the blue pane
+ // is
// emitted
.advanceProcessingTime(Duration.standardMinutes(10))
// Some more events occur
@@ -238,7 +240,8 @@ public void testTeamScoresObservablyLate() {
event(TestUser.RED_TWO, 2, Duration.ZERO),
event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)),
event(TestUser.RED_TWO, 3, Duration.standardMinutes(3)))
- // A late refinement is emitted due to the advance in processing time, but the window has
+ // A late refinement is emitted due to the advance in processing time, but the window
+ // has
// not yet closed because the watermark has not advanced
.advanceProcessingTime(Duration.standardMinutes(12))
// These elements should appear in the final pane
@@ -303,7 +306,8 @@ public void testTeamScoresDroppablyLate() {
.plus(ALLOWED_LATENESS)
.plus(TEAM_WINDOW_DURATION)
.plus(Duration.standardMinutes(1)))
- // These elements within the expired window are droppably late, and will not appear in the
+ // These elements within the expired window are droppably late, and will not appear in
+ // the
// output
.addElements(
event(
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index bd10307f7e3d..c53f48f0ff93 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -33,7 +33,7 @@
private TestApexRunner(ApexPipelineOptions options) {
options.setEmbeddedExecution(true);
- //options.setEmbeddedExecutionDebugMode(false);
+ // options.setEmbeddedExecutionDebugMode(false);
this.delegate = ApexRunner.fromOptions(options);
}
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index ca1c7ffa24de..eb625e4acc15 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -77,9 +77,7 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Map<TupleTag<?>, Coder<?>> outputCoders =
- outputs
- .entrySet()
- .stream()
+ outputs.entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(
Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
@@ -138,9 +136,7 @@ public void translate(
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Map<TupleTag<?>, Coder<?>> outputCoders =
- outputs
- .entrySet()
- .stream()
+ outputs.entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(
Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
@@ -221,8 +217,8 @@ static void addSideInputs(
.getWindowingStrategy()
.equals(firstSideInput.getWindowingStrategy())) {
// TODO: check how to handle this in stream codec
- //String msg = "Multiple side inputs with different window strategies.";
- //throw new UnsupportedOperationException(msg);
+ // String msg = "Multiple side inputs with different window strategies.";
+ // throw new UnsupportedOperationException(msg);
LOG.warn(
"Side inputs union with different windowing strategies {} {}",
firstSideInput.getWindowingStrategy(),
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 7a5fbaa8ec86..618b7a79e2ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -382,7 +382,7 @@ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
checkState(
minEventTimeTimer >= currentInputWatermark,
"Event time timer processing generates new timer(s) behind watermark.");
- //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+ // LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
// currentInputWatermark);
// TODO: is this the right way to trigger processing time timers?
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
index 5746dcab4e5d..7df1e2bcf483 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -56,8 +56,8 @@ public void testGetYarnDeployDependencies() throws Exception {
List<File> deps = ApexYarnLauncher.getYarnDeployDependencies();
String depsToString = deps.toString();
// the beam dependencies are not present as jar when running within the Maven build reactor
- //assertThat(depsToString, containsString("beam-runners-core-"));
- //assertThat(depsToString, containsString("beam-runners-apex-"));
+ // assertThat(depsToString, containsString("beam-runners-core-"));
+ // assertThat(depsToString, containsString("beam-runners-apex-"));
assertThat(depsToString, containsString("apex-common-"));
assertThat(depsToString, not(containsString("hadoop-")));
assertThat(depsToString, not(containsString("zookeeper-")));
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 025c7d642cec..70c92762018b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -92,7 +92,8 @@ public FunctionSpec translate(
/** Produces a {@link RunnerApi.CombinePayload} from a {@link Combine}. */
static <K, InputT, OutputT> CombinePayload payloadForCombine(
final AppliedPTransform<
- PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, InputT>>,
+ PCollection<KV<K, OutputT>>,
Combine.PerKey<K, InputT, OutputT>>
combine,
final SdkComponents components)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index f812de1dbabc..daf9c48dff53 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -53,7 +53,8 @@
@Deprecated
public static <ElemT, ViewT> PCollectionView<ViewT> getView(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>>
application)
throws IOException {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
index fb04428fd03e..bfa926d1405b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -112,10 +112,7 @@ public static String generateNameFromTransformNames(
} else {
// Enumerate the outer stages with their composite structure, if any.
parts =
- groupByOuter
- .asMap()
- .entrySet()
- .stream()
+ groupByOuter.asMap().entrySet().stream()
.map(
outer ->
String.format(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index fb4948b5f246..463e51dcbaea 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -301,10 +301,7 @@ public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, ?> ap
}
public static Map<TupleTag<?>, Coder<?>> getOutputCoders(AppliedPTransform<?, ?, ?> application) {
- return application
- .getOutputs()
- .entrySet()
- .stream()
+ return application.getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
index b4a917ca3dce..23a23d16b7dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
@@ -100,7 +100,7 @@ public static PipelineOptions fromJson(String optionsJson) {
Map<String, Object> probingOptionsMap =
MAPPER.readValue(optionsJson, new TypeReference<Map<String, Object>>() {});
if (probingOptionsMap.containsKey("options")) {
- //Legacy options.
+ // Legacy options.
return MAPPER.readValue(optionsJson, PipelineOptions.class);
} else {
// Fn Options with namespace and version.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
index 94f8c3ba1902..a7a914c54a17 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -80,8 +80,7 @@
*/
public static List<String> prepareFilesForStaging(
List<String> resourcesToStage, String tmpJarLocation) {
- return resourcesToStage
- .stream()
+ return resourcesToStage.stream()
.map(File::new)
.filter(File::exists)
.map(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
index afd3d7012a38..8863c66cfe2e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -152,9 +152,7 @@ public void visitPrimitiveTransform(Node node) {
// throws UnsupportedOperationException.
transformBuilder.clearSubtransforms();
transformBuilder.addAllSubtransforms(
- transform
- .getSubtransformsList()
- .stream()
+ transform.getSubtransformsList().stream()
.filter(id -> !viewTransforms.contains(id))
.collect(Collectors.toList()));
newTransforms.put(transformId, transformBuilder.build());
@@ -168,9 +166,7 @@ public void visitPrimitiveTransform(Node node) {
viewOutputsToInputs.keySet().forEach(newPipeline.getComponentsBuilder()::removePcollections);
newPipeline.clearRootTransformIds();
newPipeline.addAllRootTransformIds(
- pipeline
- .getRootTransformIdsList()
- .stream()
+ pipeline.getRootTransformIdsList().stream()
.filter(id -> !viewTransforms.contains(id))
.collect(Collectors.toList()));
return newPipeline.build();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 3197c694b62d..67b788764521 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -60,14 +60,16 @@
/** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessNaive}. */
public static class OverrideFactory<InputT, OutputT, RestrictionT>
implements PTransformOverrideFactory<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+ PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
@Override
public PTransformReplacement<
PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+ PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>>
transform) {
checkArgument(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index c40b4ce005bb..e42487ce692c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -132,8 +132,7 @@ public void validate() {
return ImmutableList.of(this);
}
List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options);
- return splits
- .stream()
+ return splits.stream()
.map(input -> new BoundedToUnboundedSourceAdapter<>(input))
.collect(Collectors.toList());
} catch (Exception e) {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 4eb6bb0949e5..817f2d23c1bf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -130,7 +130,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
AppliedPTransform<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
@@ -140,7 +141,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
public static <UserT, DestinationT> List<PCollectionView<?>> getDynamicDestinationSideInputs(
AppliedPTransform<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
@@ -167,7 +169,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
public static <T, DestinationT> boolean isWindowedWrites(
AppliedPTransform<
- PCollection<T>, WriteFilesResult<DestinationT>,
+ PCollection<T>,
+ WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
@@ -176,7 +179,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
public static <T, DestinationT> boolean isRunnerDeterminedSharding(
AppliedPTransform<
- PCollection<T>, WriteFilesResult<DestinationT>,
+ PCollection<T>,
+ WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
@@ -185,7 +189,8 @@ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
private static <T, DestinationT> WriteFilesPayload getWriteFilesPayload(
AppliedPTransform<
- PCollection<T>, WriteFilesResult<DestinationT>,
+ PCollection<T>,
+ WriteFilesResult<DestinationT>,
? extends PTransform<PCollection<T>, WriteFilesResult<DestinationT>>>
transform)
throws IOException {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index db8c63463568..8bdd71890574 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -183,8 +183,7 @@ default PTransform toPTransform(String uniqueName) {
.toBuilder()
.clearTransforms()
.putAllTransforms(
- getTransforms()
- .stream()
+ getTransforms().stream()
.collect(
Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))));
@@ -214,33 +213,23 @@ static ExecutableStage fromPayload(ExecutableStagePayload payload) {
PipelineNode.pCollection(
payload.getInput(), components.getPcollectionsOrThrow(payload.getInput()));
List<SideInputReference> sideInputs =
- payload
- .getSideInputsList()
- .stream()
+ payload.getSideInputsList().stream()
.map(sideInputId -> SideInputReference.fromSideInputId(sideInputId, components))
.collect(Collectors.toList());
List<UserStateReference> userStates =
- payload
- .getUserStatesList()
- .stream()
+ payload.getUserStatesList().stream()
.map(userStateId -> UserStateReference.fromUserStateId(userStateId, components))
.collect(Collectors.toList());
List<TimerReference> timers =
- payload
- .getTimersList()
- .stream()
+ payload.getTimersList().stream()
.map(timerId -> TimerReference.fromTimerId(timerId, components))
.collect(Collectors.toList());
List<PTransformNode> transforms =
- payload
- .getTransformsList()
- .stream()
+ payload.getTransformsList().stream()
.map(id -> PipelineNode.pTransform(id, components.getTransformsOrThrow(id)))
.collect(Collectors.toList());
List<PCollectionNode> outputs =
- payload
- .getOutputsList()
- .stream()
+ payload.getOutputsList().stream()
.map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id)))
.collect(Collectors.toList());
return ImmutableExecutableStage.of(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index facac67d57a7..e0d68289dd49 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -65,8 +65,7 @@ static FusedPipeline of(
Set<String> executableTransformIds =
Sets.union(
executableStageTransforms.keySet(),
- getRunnerExecutedTransforms()
- .stream()
+ getRunnerExecutedTransforms().stream()
.map(PTransformNode::getId)
.collect(Collectors.toSet()));
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index b52c83c5d8c8..e1c50916b705 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -214,9 +214,11 @@ private static boolean parDoCompatibility(
// upstream of any of the side inputs.
|| (pipeline.getSideInputs(parDo).isEmpty()
// We purposefully break fusion here to provide runners the opportunity to insert a
- // grouping operation to simplify implementing support for ParDo's that contain user state.
+ // grouping operation to simplify implementing support for ParDo's that contain user
+ // state.
// We would not need to do this if we had the ability to mark upstream transforms as
- // key preserving or if runners could execute ParDos containing user state in a distributed
+ // key preserving or if runners could execute ParDos containing user state in a
+ // distributed
// fashion for a single key.
&& pipeline.getUserStates(parDo).isEmpty()
// We purposefully break fusion here to provide runners the opportunity to insert a
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
index 640d402214d2..34c57e33af01 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuser.java
@@ -165,15 +165,13 @@ private FusedPipeline fusePipeline(
// as can compatible producers/consumers if a PCollection is only materialized once.
return FusedPipeline.of(
deduplicated.getDeduplicatedComponents(),
- stages
- .stream()
+ stages.stream()
.map(stage -> deduplicated.getDeduplicatedStages().getOrDefault(stage, stage))
.map(GreedyPipelineFuser::sanitizeDanglingPTransformInputs)
.collect(Collectors.toSet()),
Sets.union(
deduplicated.getIntroducedTransforms(),
- unfusedTransforms
- .stream()
+ unfusedTransforms.stream()
.map(
transform ->
deduplicated
@@ -306,8 +304,7 @@ static DescendantConsumers of(
pipeline.getEnvironment(newConsumer.consumingTransform()).get());
boolean foundSiblings = false;
for (Set<CollectionConsumer> existingConsumers : compatibleConsumers.get(key)) {
- if (existingConsumers
- .stream()
+ if (existingConsumers.stream()
.allMatch(
// The two consume the same PCollection and can exist in the same stage.
collectionConsumer ->
@@ -340,8 +337,7 @@ private ExecutableStage fuseSiblings(Set<CollectionConsumer> mutuallyCompatible)
return GreedyStageFuser.forGrpcPortRead(
pipeline,
rootCollection,
- mutuallyCompatible
- .stream()
+ mutuallyCompatible.stream()
.map(CollectionConsumer::consumingTransform)
.collect(Collectors.toSet()));
}
@@ -359,27 +355,19 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
Set<String> possibleInputs = new HashSet<>();
possibleInputs.add(stage.getInputPCollection().getId());
possibleInputs.addAll(
- stage
- .getOutputPCollections()
- .stream()
+ stage.getOutputPCollections().stream()
.map(PCollectionNode::getId)
.collect(Collectors.toSet()));
possibleInputs.addAll(
- stage
- .getSideInputs()
- .stream()
+ stage.getSideInputs().stream()
.map(s -> s.collection().getId())
.collect(Collectors.toSet()));
possibleInputs.addAll(
- stage
- .getTransforms()
- .stream()
+ stage.getTransforms().stream()
.flatMap(t -> t.getTransform().getOutputsMap().values().stream())
.collect(Collectors.toSet()));
Set<String> danglingInputs =
- stage
- .getTransforms()
- .stream()
+ stage.getTransforms().stream()
.flatMap(t -> t.getTransform().getInputsMap().values().stream())
.filter(in -> !possibleInputs.contains(in))
.collect(Collectors.toSet());
@@ -388,10 +376,7 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
for (PTransformNode transformNode : stage.getTransforms()) {
PTransform transform = transformNode.getTransform();
Map<String, String> validInputs =
- transform
- .getInputsMap()
- .entrySet()
- .stream()
+ transform.getInputsMap().entrySet().stream()
.filter(e -> !danglingInputs.contains(e.getValue()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
@@ -411,15 +396,10 @@ private static ExecutableStage sanitizeDanglingPTransformInputs(ExecutableStage
componentBuilder
.clearTransforms()
.putAllTransforms(
- pTransformNodes
- .stream()
+ pTransformNodes.stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)));
Map<String, PCollection> validPCollectionMap =
- stage
- .getComponents()
- .getPcollectionsMap()
- .entrySet()
- .stream()
+ stage.getComponents().getPcollectionsMap().entrySet().stream()
.filter(e -> !danglingInputs.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
index 1f1c3832725c..9546d973c1ba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
@@ -43,8 +43,7 @@ public static ImmutableExecutableStage ofFullComponents(
.toBuilder()
.clearTransforms()
.putAllTransforms(
- transforms
- .stream()
+ transforms.stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)))
.build();
return of(
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
index b2d277e4a462..c179372c05bb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/Networks.java
@@ -208,11 +208,14 @@ public final NodeT apply(NodeT input) {
//
// The only edges that are ignored by the algorithm are back edges.
// The algorithm (while there are still nodes in the graph):
- // 1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do this till there
+ // 1) Removes all sinks from the graph adding them to the beginning of "s2". Continue to do
+ // this till there
// are no more sinks.
- // 2) Removes all source from the graph adding them to the end of "s1". Continue to do this till there
+ // 2) Removes all source from the graph adding them to the end of "s1". Continue to do this
+ // till there
// are no more sources.
- // 3) Remote a single node with the highest delta within the graph and add it to the end of "s1".
+ // 3) Remote a single node with the highest delta within the graph and add it to the end of
+ // "s1".
//
// The topological order is then the s1 concatenated with s2.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
index 5a481901c8f3..f64d94d6f5a0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
@@ -293,13 +293,10 @@ private static ExecutableStage deduplicateStageOutput(
.toBuilder()
.clearTransforms()
.putAllTransforms(
- updatedTransforms
- .stream()
+ updatedTransforms.stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)))
.putAllPcollections(
- originalToPartial
- .values()
- .stream()
+ originalToPartial.values().stream()
.collect(
Collectors.toMap(PCollectionNode::getId, PCollectionNode::getPCollection)))
.build();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 710fbe2e4570..c4b12d75ed0a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -132,11 +132,15 @@ private QueryablePipeline(Collection<String> transformIds, Components components
PTransform transform = transformEntry.getValue();
boolean isPrimitive = isPrimitiveTransform(transform);
if (isPrimitive) {
- // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested descendents), due to runners
- // either rewriting them in terms of runner-specific transforms, or SDKs constructing them in terms of other
+ // Sometimes "primitive" transforms have sub-transforms (and even deeper-nested
+ // descendents), due to runners
+ // either rewriting them in terms of runner-specific transforms, or SDKs constructing them
+ // in terms of other
// underlying transforms (see https://issues.apache.org/jira/browse/BEAM-5441).
- // We consider any "leaf" descendents of these "primitive" transforms to be the true "primitives" that we
- // preserve here; in the common case, this is just the "primitive" itself, which has no descendents).
+ // We consider any "leaf" descendents of these "primitive" transforms to be the true
+ // "primitives" that we
+ // preserve here; in the common case, this is just the "primitive" itself, which has no
+ // descendents).
Deque<String> transforms = new ArrayDeque<>();
transforms.push(transformEntry.getKey());
while (!transforms.isEmpty()) {
@@ -229,9 +233,7 @@ private static boolean isPrimitiveTransform(PTransform transform) {
}
public Collection<PTransformNode> getTransforms() {
- return pipelineNetwork
- .nodes()
- .stream()
+ return pipelineNetwork.nodes().stream()
.filter(PTransformNode.class::isInstance)
.map(PTransformNode.class::cast)
.collect(Collectors.toList());
@@ -252,9 +254,7 @@ private static boolean isPrimitiveTransform(PTransform transform) {
* have no input {@link PCollection}.
*/
public Set<PTransformNode> getRootTransforms() {
- return pipelineNetwork
- .nodes()
- .stream()
+ return pipelineNetwork.nodes().stream()
.filter(pipelineNode -> pipelineNetwork.inEdges(pipelineNode).isEmpty())
.map(pipelineNode -> (PTransformNode) pipelineNode)
.collect(Collectors.toSet());
@@ -276,14 +276,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
* does consume the {@link PCollectionNode} on a per-element basis.
*/
public Set<PTransformNode> getPerElementConsumers(PCollectionNode pCollection) {
- return pipelineNetwork
- .successors(pCollection)
- .stream()
+ return pipelineNetwork.successors(pCollection).stream()
.filter(
consumer ->
- pipelineNetwork
- .edgesConnecting(pCollection, consumer)
- .stream()
+ pipelineNetwork.edgesConnecting(pCollection, consumer).stream()
.anyMatch(PipelineEdge::isPerElement))
.map(pipelineNode -> (PTransformNode) pipelineNode)
.collect(Collectors.toSet());
@@ -294,14 +290,10 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
* the collection as a singleton.
*/
public Set<PTransformNode> getSingletonConsumers(PCollectionNode pCollection) {
- return pipelineNetwork
- .successors(pCollection)
- .stream()
+ return pipelineNetwork.successors(pCollection).stream()
.filter(
consumer ->
- pipelineNetwork
- .edgesConnecting(pCollection, consumer)
- .stream()
+ pipelineNetwork.edgesConnecting(pCollection, consumer).stream()
.anyMatch(edge -> !edge.isPerElement()))
.map(pipelineNode -> (PTransformNode) pipelineNode)
.collect(Collectors.toSet());
@@ -312,18 +304,14 @@ public PTransformNode getProducer(PCollectionNode pcollection) {
* per-element basis.
*/
public Set<PCollectionNode> getPerElementInputPCollections(PTransformNode ptransform) {
- return pipelineNetwork
- .inEdges(ptransform)
- .stream()
+ return pipelineNetwork.inEdges(ptransform).stream()
.filter(PipelineEdge::isPerElement)
.map(edge -> (PCollectionNode) pipelineNetwork.incidentNodes(edge).source())
.collect(Collectors.toSet());
}
public Set<PCollectionNode> getOutputPCollections(PTransformNode ptransform) {
- return pipelineNetwork
- .successors(ptransform)
- .stream()
+ return pipelineNetwork.successors(ptransform).stream()
.map(pipelineNode -> (PCollectionNode) pipelineNode)
.collect(Collectors.toSet());
}
@@ -337,8 +325,7 @@ public Components getComponents() {
* as side inputs.
*/
public Collection<SideInputReference> getSideInputs(PTransformNode transform) {
- return getLocalSideInputNames(transform.getTransform())
- .stream()
+ return getLocalSideInputNames(transform.getTransform()).stream()
.map(
localName -> {
String transformId = transform.getId();
@@ -354,8 +341,7 @@ public Components getComponents() {
}
public Collection<UserStateReference> getUserStates(PTransformNode transform) {
- return getLocalUserStateNames(transform.getTransform())
- .stream()
+ return getLocalUserStateNames(transform.getTransform()).stream()
.map(
localName -> {
String transformId = transform.getId();
@@ -382,8 +368,7 @@ public Components getComponents() {
}
public Collection<TimerReference> getTimers(PTransformNode transform) {
- return getLocalTimerNames(transform.getTransform())
- .stream()
+ return getLocalTimerNames(transform.getTransform()).stream()
.map(
localName -> {
String transformId = transform.getId();
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
index 60525e991f87..ed1286ce6d35 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
@@ -69,9 +69,7 @@ public void processElement(
}));
ExecutableStage firstEnvStage =
- GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
- .getFusedStages()
- .stream()
+ GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
.findFirst()
.get();
RunnerApi.ExecutableStagePayload basePayload =
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
index 25e7253a671d..13a954913a81 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -51,13 +51,15 @@
PCollection<? extends Integer>, PCollection<Integer>, MapElements<Integer, Integer>>
factory =
new SingleInputOutputOverrideFactory<
- PCollection<? extends Integer>, PCollection<Integer>,
+ PCollection<? extends Integer>,
+ PCollection<Integer>,
MapElements<Integer, Integer>>() {
@Override
public PTransformReplacement<PCollection<? extends Integer>, PCollection<Integer>>
getReplacementTransform(
AppliedPTransform<
- PCollection<? extends Integer>, PCollection<Integer>,
+ PCollection<? extends Integer>,
+ PCollection<Integer>,
MapElements<Integer, Integer>>
transform) {
return PTransformReplacement.of(
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
index 1535d03e9e0f..3c477184717a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
@@ -105,8 +105,7 @@ protected boolean matchesSafely(ExecutableStage item) {
return item.getInputPCollection().getId().equals(inputPCollectionId)
&& containsInAnyOrder(sideInputIds.toArray())
.matches(
- item.getSideInputs()
- .stream()
+ item.getSideInputs().stream()
.map(
ref ->
SideInputId.newBuilder()
@@ -115,14 +114,12 @@ protected boolean matchesSafely(ExecutableStage item) {
.build())
.collect(Collectors.toSet()))
&& materializedPCollection.matches(
- item.getOutputPCollections()
- .stream()
+ item.getOutputPCollections().stream()
.map(PCollectionNode::getId)
.collect(Collectors.toSet()))
&& containsInAnyOrder(fusedTransforms.toArray(new String[0]))
.matches(
- item.getTransforms()
- .stream()
+ item.getTransforms().stream()
.map(PTransformNode::getId)
.collect(Collectors.toSet()));
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 5e681f2bb7cd..e329c00c2cf0 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -536,9 +536,7 @@ public void flattenWithHeterogenousInputsAndOutputsEntirelyMaterialized() {
.withNoOutputs()
.withTransforms("pyParDo")));
Set<String> materializedStageOutputs =
- fused
- .getFusedStages()
- .stream()
+ fused.getFusedStages().stream()
.flatMap(executableStage -> executableStage.getOutputPCollections().stream())
.map(PCollectionNode::getId)
.collect(Collectors.toSet());
@@ -1316,16 +1314,11 @@ public void sanitizedTransforms() throws Exception {
ExecutableStageMatcher.withInput(impulse2Output.getUniqueName())
.withTransforms(flattenTransform.getUniqueName(), read2Transform.getUniqueName())));
assertThat(
- fused
- .getFusedStages()
- .stream()
+ fused.getFusedStages().stream()
.flatMap(
s ->
- s.getComponents()
- .getTransformsOrThrow(flattenTransform.getUniqueName())
- .getInputsMap()
- .values()
- .stream())
+ s.getComponents().getTransformsOrThrow(flattenTransform.getUniqueName())
+ .getInputsMap().values().stream())
.collect(Collectors.toList()),
containsInAnyOrder(read1Output.getUniqueName(), read2Output.getUniqueName()));
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 1e20c73feb37..94ad3e8abd68 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -1165,9 +1165,7 @@ public void materializesWithGroupByKeyConsumer() {
protected boolean matchesSafely(ExecutableStage executableStage) {
// NOTE: Transform names must be unique, so it's fine to throw here if this does not hold.
Set<String> stageTransforms =
- executableStage
- .getTransforms()
- .stream()
+ executableStage.getTransforms().stream()
.map(PTransformNode::getId)
.collect(Collectors.toSet());
return stageTransforms.containsAll(expectedTransforms)
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
index 065ad4e71361..a2a9991e2d47 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/NetworksTest.java
@@ -224,18 +224,14 @@ public void testNodeReplacement() {
MutableNetwork<String, String> originalNetwork = createNetwork();
for (String node : originalNetwork.nodes()) {
assertEquals(
- originalNetwork
- .successors(node)
- .stream()
+ originalNetwork.successors(node).stream()
.map(function)
.collect(Collectors.toCollection(HashSet::new)),
network.successors(function.apply(node)));
}
assertEquals(
network.nodes(),
- originalNetwork
- .nodes()
- .stream()
+ originalNetwork.nodes().stream()
.map(function)
.collect(Collectors.toCollection(HashSet::new)));
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
index 624eff331402..023cc27cc7a6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicatorTest.java
@@ -271,10 +271,7 @@ public void duplicateOverStages() {
assertThat(result.getDeduplicatedStages().keySet(), hasSize(2));
List<String> stageOutputs =
- result
- .getDeduplicatedStages()
- .values()
- .stream()
+ result.getDeduplicatedStages().values().stream()
.flatMap(stage -> stage.getOutputPCollections().stream().map(PCollectionNode::getId))
.collect(Collectors.toList());
assertThat(
@@ -398,11 +395,7 @@ public void duplicateOverStagesAndTransforms() {
introducedOutputs.addAll(
result.getDeduplicatedTransforms().get("shared").getTransform().getOutputsMap().values());
introducedOutputs.addAll(
- result
- .getDeduplicatedStages()
- .get(oneStage)
- .getOutputPCollections()
- .stream()
+ result.getDeduplicatedStages().get(oneStage).getOutputPCollections().stream()
.map(PCollectionNode::getId)
.collect(Collectors.toList()));
assertThat(
@@ -588,16 +581,11 @@ public void multipleDuplicatesInStages() {
assertThat(result.getDeduplicatedTransforms().keySet(), empty());
Collection<String> introducedIds =
- result
- .getIntroducedTransforms()
- .stream()
+ result.getIntroducedTransforms().stream()
.flatMap(pt -> pt.getTransform().getInputsMap().values().stream())
.collect(Collectors.toList());
String[] stageOutputs =
- result
- .getDeduplicatedStages()
- .values()
- .stream()
+ result.getDeduplicatedStages().values().stream()
.flatMap(s -> s.getOutputPCollections().stream().map(PCollectionNode::getId))
.toArray(String[]::new);
assertThat(introducedIds, containsInAnyOrder(stageOutputs));
@@ -608,9 +596,7 @@ public void multipleDuplicatesInStages() {
assertThat(
result.getDeduplicatedComponents().getTransformsMap().entrySet(),
hasItems(
- result
- .getIntroducedTransforms()
- .stream()
+ result.getIntroducedTransforms().stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform))
.entrySet()
.toArray(new Map.Entry[0])));
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
index 055b66594386..c2399545ce57 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
@@ -210,10 +210,7 @@ public void transformWithSideAndMainInputs() {
PTransform parDoTransform = components.getTransformsOrThrow("par_do");
String sideInputLocalName =
getOnlyElement(
- parDoTransform
- .getInputsMap()
- .entrySet()
- .stream()
+ parDoTransform.getInputsMap().entrySet().stream()
.filter(entry -> !entry.getValue().equals(mainInputName))
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 790274833666..eafc9b82bfc0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -116,9 +116,7 @@ public LateDataFilter(
StreamSupport.stream(elements.spliterator(), false)
.map(
input ->
- input
- .getWindows()
- .stream()
+ input.getWindows().stream()
.map(
window ->
WindowedValue.of(
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 45c847edae57..93b9a7fddab3 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -104,14 +104,16 @@ public String getUrn() {
/** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */
public static class OverrideFactory<InputT, OutputT, RestrictionT>
implements PTransformOverrideFactory<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+ PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
@Override
public PTransformReplacement<
PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KV<byte[], KV<InputT, RestrictionT>>>,
+ PCollectionTuple,
ProcessKeyedElements<InputT, OutputT, RestrictionT>>
transform) {
return PTransformReplacement.of(
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index d5d10de83707..f03f71f1c619 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -540,8 +540,7 @@ public final void injectElements(List<TimestampedValue<InputT>> values) throws E
}
Iterable<WindowedValue<InputT>> inputs =
- values
- .stream()
+ values.stream()
.map(
input -> {
try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 0a9ec4935190..2bab79b1cb3d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -28,14 +28,16 @@
/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
extends SingleInputOutputOverrideFactory<
- PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+ PCollection<KV<KeyT, InputT>>,
+ PCollection<KeyedWorkItem<KeyT, InputT>>,
GBKIntoKeyedWorkItems<KeyT, InputT>> {
@Override
public PTransformReplacement<
PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>,
+ PCollection<KV<KeyT, InputT>>,
+ PCollection<KeyedWorkItem<KeyT, InputT>>,
GBKIntoKeyedWorkItems<KeyT, InputT>>
transform) {
return PTransformReplacement.of(
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 49528160fa81..873eb1f4c884 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -30,13 +30,15 @@
/** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
final class DirectGroupByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PCollection<KV<K, V>>,
+ PCollection<KV<K, Iterable<V>>>,
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PCollection<KV<K, V>>,
+ PCollection<KV<K, Iterable<V>>>,
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>>
transform) {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index a5a1c7658e1e..5b026cba3943 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -324,8 +324,7 @@ private void shutdownIfNecessary(State newState) {
"Error"
+ (errors.size() == 1 ? "" : "s")
+ " during executor shutdown:\n"
- + errors
- .stream()
+ + errors.stream()
.map(Exception::getMessage)
.collect(Collectors.joining("\n- ", "- ", "")));
visibleUpdates.failed(exception);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 0a2d42f67c05..77aa9fc1826a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -81,7 +81,8 @@ public void cleanup() {}
private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
AppliedPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PCollection<KeyedWorkItem<K, V>>,
+ PCollection<KV<K, Iterable<V>>>,
DirectGroupAlsoByWindow<K, V>>
application,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
@@ -100,7 +101,8 @@ public void cleanup() {}
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
private final AppliedPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PCollection<KeyedWorkItem<K, V>>,
+ PCollection<KV<K, Iterable<V>>>,
DirectGroupAlsoByWindow<K, V>>
application;
@@ -120,7 +122,8 @@ public GroupAlsoByWindowEvaluator(
PipelineOptions options,
CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
final AppliedPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ PCollection<KeyedWorkItem<K, V>>,
+ PCollection<KV<K, Iterable<V>>>,
DirectGroupAlsoByWindow<K, V>>
application) {
this.evaluationContext = evaluationContext;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
index d22ddb7fdebd..4f0594a4291a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/MultiStepCombine.java
@@ -118,7 +118,8 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
static class Factory<K, InputT, AccumT, OutputT>
extends SingleInputOutputOverrideFactory<
- PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, InputT>>,
+ PCollection<KV<K, OutputT>>,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
public static PTransformOverrideFactory create() {
return new Factory<>();
@@ -130,7 +131,8 @@ private Factory() {}
public PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, InputT>>,
+ PCollection<KV<K, OutputT>>,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>>
transform) {
GlobalCombineFn<?, ?, ?> globalFn = ((Combine.PerKey) transform.getTransform()).getFn();
@@ -366,7 +368,8 @@ public MergeAndExtractAccumulatorOutputEvaluatorFactory(EvaluationContext ctxt)
private <K, AccumT, OutputT> TransformEvaluator<KV<K, Iterable<AccumT>>> createEvaluator(
AppliedPTransform<
- PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, Iterable<AccumT>>>,
+ PCollection<KV<K, OutputT>>,
MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
application,
CommittedBundle<KV<K, Iterable<AccumT>>> inputBundle) {
@@ -380,7 +383,8 @@ public void cleanup() throws Exception {}
private static class MergeAccumulatorsAndExtractOutputEvaluator<K, AccumT, OutputT>
implements TransformEvaluator<KV<K, Iterable<AccumT>>> {
private final AppliedPTransform<
- PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, Iterable<AccumT>>>,
+ PCollection<KV<K, OutputT>>,
MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
application;
private final CombineFn<?, AccumT, OutputT> combineFn;
@@ -389,7 +393,8 @@ public void cleanup() throws Exception {}
public MergeAccumulatorsAndExtractOutputEvaluator(
EvaluationContext ctxt,
AppliedPTransform<
- PCollection<KV<K, Iterable<AccumT>>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, Iterable<AccumT>>>,
+ PCollection<KV<K, OutputT>>,
MergeAndExtractAccumulatorOutput<K, AccumT, OutputT>>
application) {
this.application = application;
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 01bb26dfaa4c..c8ad2383659e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -112,9 +112,7 @@
evaluationContext.createSideInputReader(sideInputs);
Map<TupleTag<?>, Coder<?>> outputCoders =
- outputs
- .entrySet()
- .stream()
+ outputs.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getCoder()));
PushbackSideInputDoFnRunner<InputT, OutputT> runner =
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 112487851e8b..a11b384d7215 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -65,13 +65,15 @@
@VisibleForTesting
public class ParDoMultiOverrideFactory<InputT, OutputT>
implements PTransformOverrideFactory<
- PCollection<? extends InputT>, PCollectionTuple,
+ PCollection<? extends InputT>,
+ PCollectionTuple,
PTransform<PCollection<? extends InputT>, PCollectionTuple>> {
@Override
public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<? extends InputT>, PCollectionTuple,
+ PCollection<? extends InputT>,
+ PCollectionTuple,
PTransform<PCollection<? extends InputT>, PCollectionTuple>>
application) {
@@ -87,7 +89,8 @@
@SuppressWarnings("unchecked")
private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication(
AppliedPTransform<
- PCollection<? extends InputT>, PCollectionTuple,
+ PCollection<? extends InputT>,
+ PCollectionTuple,
PTransform<PCollection<? extends InputT>, PCollectionTuple>>
application)
throws IOException {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 737098f44738..86820ef2457b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -103,7 +103,8 @@ public void cleanup() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> createEvaluator(
AppliedPTransform<
- PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, PCollectionTuple,
+ PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
+ PCollectionTuple,
ProcessElements<InputT, OutputT, RestrictionT, PositionT>>
application,
CommittedBundle<InputT> inputBundle)
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index a12d71b865f6..bd05a5bcaca9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -106,7 +106,8 @@ public void cleanup() throws Exception {
@SuppressWarnings({"unchecked", "rawtypes"})
private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(
AppliedPTransform<
- PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+ PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+ PCollectionTuple,
StatefulParDo<K, InputT, OutputT>>
application,
CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> inputBundle)
@@ -203,7 +204,8 @@ public Runnable load(
@AutoValue
abstract static class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
abstract AppliedPTransform<
- PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+ PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+ PCollectionTuple,
StatefulParDo<K, InputT, OutputT>>
getTransform();
@@ -213,7 +215,8 @@ public Runnable load(
static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(
AppliedPTransform<
- PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple,
+ PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>,
+ PCollectionTuple,
StatefulParDo<K, InputT, OutputT>>
transform,
StructuralKey<K> key,
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 6f5eff614a7d..397b0675f640 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -42,13 +42,15 @@
*/
class ViewOverrideFactory<ElemT, ViewT>
implements PTransformOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
@Override
public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>>
transform) {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 4d5a2d9f2123..62cc673f1953 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -51,7 +51,8 @@
*/
class WriteWithShardingFactory<InputT, DestinationT>
implements PTransformOverrideFactory<
- PCollection<InputT>, WriteFilesResult<DestinationT>,
+ PCollection<InputT>,
+ WriteFilesResult<DestinationT>,
PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
@@ -60,7 +61,8 @@
public PTransformReplacement<PCollection<InputT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<InputT>, WriteFilesResult<DestinationT>,
+ PCollection<InputT>,
+ WriteFilesResult<DestinationT>,
PTransform<PCollection<InputT>, WriteFilesResult<DestinationT>>>
transform) {
try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 55282534e40c..e4442108f123 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -317,8 +317,7 @@ private void shutdownIfNecessary(State newState) {
"Error"
+ (errors.size() == 1 ? "" : "s")
+ " during executor shutdown:\n"
- + errors
- .stream()
+ + errors.stream()
.map(Exception::getMessage)
.collect(Collectors.joining("\n- ", "- ", "")));
visibleUpdates.failed(exception);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
index 8c6873a422e7..2c2c9f07425c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -479,8 +479,7 @@ private static Pipeline foldFeedSDFIntoExecutableStage(Pipeline p) {
QueryablePipeline q = QueryablePipeline.forPipeline(p);
String feedSdfUrn = SplittableRemoteStageEvaluatorFactory.FEED_SDF_URN;
List<PTransformNode> feedSDFNodes =
- q.getTransforms()
- .stream()
+ q.getTransforms().stream()
.filter(node -> node.getTransform().getSpec().getUrn().equals(feedSdfUrn))
.collect(Collectors.toList());
Map<String, PTransformNode> stageToFeeder = Maps.newHashMap();
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
index 77b1028844c2..8e1514f2fd62 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/job/ReferenceRunnerJobServer.java
@@ -124,10 +124,9 @@ public void stop() {
private static class ServerConfiguration {
@Option(
- name = "-p",
- aliases = {"--port"},
- usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)"
- )
+ name = "-p",
+ aliases = {"--port"},
+ usage = "The local port to expose the server on. 0 to use a dynamic port. (Default: 8099)")
private int port = 8099;
}
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index eac8554804b7..d3ede31814e9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -326,7 +326,8 @@ public void onElement(final ProcessContext ctx) {
@Teardown
public void teardown() {
- // just to not have a fast execution hiding an issue until we have a shutdown callback
+ // just to not have a fast execution hiding an issue until we have a shutdown
+ // callback
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index a9673bf716ca..54018cfdbf46 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -148,7 +148,8 @@ public void process(ProcessContext c) {}
new StatefulParDoEvaluatorFactory<>(mockEvaluationContext, options);
AppliedPTransform<
- PCollection<? extends KeyedWorkItem<String, KV<String, Integer>>>, PCollectionTuple,
+ PCollection<? extends KeyedWorkItem<String, KV<String, Integer>>>,
+ PCollectionTuple,
StatefulParDo<String, Integer, Integer>>
producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
@@ -256,7 +257,8 @@ public void process(ProcessContext c) {}
// This will be the stateful ParDo from the expansion
AppliedPTransform<
- PCollection<KeyedWorkItem<String, KV<String, Integer>>>, PCollectionTuple,
+ PCollection<KeyedWorkItem<String, KV<String, Integer>>>,
+ PCollectionTuple,
StatefulParDo<String, Integer, Integer>>
producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 5ec2fd9c28f1..684ac0223ba9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -151,7 +151,8 @@ public void withNoShardingSpecifiedReturnsNewTransform() {
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<
- PCollection<Object>, WriteFilesResult<Void>,
+ PCollection<Object>,
+ WriteFilesResult<Void>,
PTransform<PCollection<Object>, WriteFilesResult<Void>>>
originalApplication =
AppliedPTransform.of("write", objs.expand(), Collections.emptyMap(), original, p);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
index f16aaea6f0ac..fd55e85d4c43 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -157,9 +157,7 @@ public void proc(ProcessContext ctxt) {
PTransformNode impulseTransform = getOnlyElement(fusedQP.getRootTransforms());
PCollectionNode impulseOutput = getOnlyElement(fusedQP.getOutputPCollections(impulseTransform));
PTransformNode stage =
- fusedPipeline
- .getRootTransformIdsList()
- .stream()
+ fusedPipeline.getRootTransformIdsList().stream()
.map(
id ->
PipelineNode.pTransform(
@@ -212,9 +210,7 @@ public void process(ProcessContext ctxt) {
checkState(leftRoot != null);
checkState(rightRoot != null);
PTransformNode stage =
- fusedPipeline
- .getRootTransformIdsList()
- .stream()
+ fusedPipeline.getRootTransformIdsList().stream()
.map(
id ->
PipelineNode.pTransform(
diff --git a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
index 2d1d94b16d76..c3af72c3ec80 100644
--- a/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
+++ b/runners/extensions-java/metrics/src/main/java/org/apache/beam/runners/extensions/metrics/MetricsGraphiteSink.java
@@ -128,9 +128,8 @@ private CounterMetricMessage(
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
String metricMessage =
@@ -145,9 +144,8 @@ protected String createCommittedMessage() {
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
String metricMessage =
@@ -172,9 +170,8 @@ private GaugeMetricMessage(MetricResult<GaugeResult> gauge, String valueType) {
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
String metricMessage =
@@ -188,9 +185,8 @@ protected String createCommittedMessage() {
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
String metricMessage =
@@ -218,9 +214,8 @@ public DistributionMetricMessage(
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createCommittedMessage() {
Number value = null;
@@ -255,9 +250,8 @@ protected String createCommittedMessage() {
}
@SuppressFBWarnings(
- value = "VA_FORMAT_STRING_USES_NEWLINE",
- justification = "\\n is part of graphite protocol"
- )
+ value = "VA_FORMAT_STRING_USES_NEWLINE",
+ justification = "\\n is part of graphite protocol")
@Override
protected String createAttemptedMessage() {
Number value = null;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index 96f4b5b15f94..df067077d8bd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -135,14 +135,16 @@ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
public static class Factory<ElemT, ViewT>
implements PTransformOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
public Factory() {}
@Override
public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>>
transform) {
PCollection<ElemT> collection =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index d2557e8b239e..8f9fcb9ade4d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -296,7 +296,7 @@ public boolean test(RunnerApi.PTransform pTransform) {
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(e);
}
- //TODO: https://issues.apache.org/jira/browse/BEAM-4296
+ // TODO: https://issues.apache.org/jira/browse/BEAM-4296
// This only works for well known window fns, we should defer this execution to the SDK
// if the WindowFn can't be parsed or just defer it all the time.
WindowFn<T, ? extends BoundedWindow> windowFn =
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index a688bee5f3cc..271ef0bcc4d3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -707,11 +707,13 @@ public void translateNode(
@SuppressWarnings("unchecked")
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>>
application =
(AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>,
+ PCollection<ElemT>,
+ PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>>)
context.getCurrentTransform();
PCollectionView<ViewT> input;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 7222a3037a38..466069f8ea31 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -110,10 +110,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
}
public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
- return currentTransform
- .getOutputs()
- .entrySet()
- .stream()
+ return currentTransform.getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 37dfb0c0ecb0..3c69817fa5ee 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -99,7 +99,8 @@ static ExecutionEnvironment createBatchExecutionEnvironment(
if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
flinkBatchEnv.setParallelism(options.getParallelism());
}
- // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent splits.
+ // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent
+ // splits.
final int parallelism;
if (flinkBatchEnv instanceof CollectionEnvironment) {
parallelism = 1;
@@ -267,7 +268,8 @@ private static int determineParallelism(
return pipelineOptionsParallelism;
}
if (envParallelism > 0) {
- // If the user supplies a parallelism on the command-line, this is set on the execution environment during creation
+ // If the user supplies a parallelism on the command-line, this is set on the execution
+ // environment during creation
return envParallelism;
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index c00f14c8f234..efaa40aa4ea2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -282,8 +282,7 @@ private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
Collection<RunnerApi.PCollection> pCollecctions =
pipeline.getComponents().getPcollectionsMap().values();
// Assume that all PCollections are consumed at some point in the pipeline.
- return pCollecctions
- .stream()
+ return pCollecctions.stream()
.anyMatch(pc -> pc.getIsBounded() == RunnerApi.IsBounded.Enum.UNBOUNDED);
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 40e815640581..e40b91f80539 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -58,15 +58,13 @@
String host = "localhost";
@Option(
- name = "--job-port",
- usage = "The job service port. 0 to use a dynamic port. (Default: 8099)"
- )
+ name = "--job-port",
+ usage = "The job service port. 0 to use a dynamic port. (Default: 8099)")
int port = 8099;
@Option(
- name = "--artifact-port",
- usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)"
- )
+ name = "--artifact-port",
+ usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)")
int artifactPort = 8098;
@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
@@ -74,9 +72,8 @@
Paths.get(System.getProperty("java.io.tmpdir"), "beam-artifact-staging").toString();
@Option(
- name = "--clean-artifacts-per-job",
- usage = "When true, remove each job's staged artifacts when it completes"
- )
+ name = "--clean-artifacts-per-job",
+ usage = "When true, remove each job's staged artifacts when it completes")
boolean cleanArtifactsPerJob = false;
@Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
@@ -87,9 +84,8 @@ String getFlinkMasterUrl() {
}
@Option(
- name = "--sdk-worker-parallelism",
- usage = "Default parallelism for SDK worker processes (see portable pipeline options)"
- )
+ name = "--sdk-worker-parallelism",
+ usage = "Default parallelism for SDK worker processes (see portable pipeline options)")
Long sdkWorkerParallelism = 1L;
Long getSdkWorkerParallelism() {
@@ -97,12 +93,11 @@ Long getSdkWorkerParallelism() {
}
@Option(
- name = "--flink-conf-dir",
- usage =
- "Directory containing Flink YAML configuration files. "
- + "These properties will be set to all jobs submitted to Flink and take precedence "
- + "over configurations in FLINK_CONF_DIR."
- )
+ name = "--flink-conf-dir",
+ usage =
+ "Directory containing Flink YAML configuration files. "
+ + "These properties will be set to all jobs submitted to Flink and take precedence "
+ + "over configurations in FLINK_CONF_DIR.")
String flinkConfDir = null;
@Nullable
@@ -112,7 +107,7 @@ String getFlinkConfDir() {
}
public static void main(String[] args) throws Exception {
- //TODO: Expose the fileSystem related options.
+ // TODO: Expose the fileSystem related options.
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
fromParams(args).run();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 6126af5303f5..83776e170864 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -172,7 +172,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
@VisibleForTesting
static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
implements PTransformOverrideFactory<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>> {
FlinkPipelineOptions options;
@@ -184,7 +185,8 @@ boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>>
transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 80ed3601fe26..8cd195b53103 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -489,7 +489,7 @@ private void translateStreamingImpulse(
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(e);
}
- //TODO: https://issues.apache.org/jira/browse/BEAM-4296
+ // TODO: https://issues.apache.org/jira/browse/BEAM-4296
// This only works for well known window fns, we should defer this execution to the SDK
// if the WindowFn can't be parsed or just defer it all the time.
WindowFn<T, ? extends BoundedWindow> windowFn =
@@ -695,7 +695,8 @@ private void translateStreamingImpulse(
for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
stagePayload.getSideInputsList()) {
- // TODO: local name is unique as long as only one transform with side input can be within a stage
+ // TODO: local name is unique as long as only one transform with side input can be within a
+ // stage
String sideInputTag = sideInputId.getLocalName();
String collectionId =
components
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c770cdf790eb..e23e67fa03ac 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -992,7 +992,8 @@ public void translateNode(
// allowed to have only one input keyed, normally.
TwoInputTransformation<
- WindowedValue<SingletonKeyedWorkItem<K, InputT>>, RawUnionValue,
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+ RawUnionValue,
WindowedValue<KV<K, OutputT>>>
rawFlinkTransform =
new TwoInputTransformation<>(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 00d1ea867feb..027b07c36156 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -103,10 +103,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
}
public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
- return currentTransform
- .getOutputs()
- .entrySet()
- .stream()
+ return currentTransform.getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
index 32ec4ca6229b..7eb8b23ef139 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
@@ -59,10 +59,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
}
private boolean hasUnboundedOutput(AppliedPTransform<?, ?, ?> transform) {
- return transform
- .getOutputs()
- .values()
- .stream()
+ return transform.getOutputs().values().stream()
.filter(value -> value instanceof PCollection)
.map(value -> (PCollection<?>) value)
.anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED);
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index 995936ecb191..ccad6742e7bb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -85,7 +85,8 @@ private JobFactoryState(int maxFactories) {
Preconditions.checkArgument(maxFactories >= 0, "sdk_worker_parallelism must be >= 0");
if (maxFactories == 0) {
- // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources available for the java process
+ // if this is 0, use the auto behavior of num_cores - 1 so that we leave some resources
+ // available for the java process
this.maxFactories = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
} else {
this.maxFactories = maxFactories;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index bc51a106d7bd..e7dafa8968f8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -332,7 +332,7 @@ public void close() throws Exception {
// close may be called multiple times when an exception is thrown
if (stageContext != null) {
try (AutoCloseable bundleFactoryCloser = stageBundleFactory;
- AutoCloseable closable = stageContext) {
+ AutoCloseable closable = stageContext) {
} catch (Exception e) {
LOG.error("Error in close: ", e);
throw e;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index c211403c2d4e..e404298d9ae9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -86,9 +86,7 @@ public SideInputInitializer(PCollectionView<ViewT> view) {
InMemoryMultimapSideInputView.fromIterable(
keyCoder,
(Iterable)
- elements
- .getValue()
- .stream()
+ elements.getValue().stream()
.map(WindowedValue::getValue)
.collect(Collectors.toList()))));
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d1ef494fc2e1..eef86d7728e8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -354,7 +354,8 @@ private void setTimer(WindowedValue<InputT> timerElement, TimerInternals.TimerDa
try {
Object key = keySelector.getKey(timerElement);
sdkHarnessRunner.setCurrentTimerKey(key);
- // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+ // We have to synchronize to ensure the state backend is not concurrently accessed by the
+ // state requests
try {
stateBackendLock.lock();
getKeyedStateBackend().setCurrentKey(key);
@@ -385,7 +386,8 @@ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
}
// Prepare the SdkHarnessRunner with the key for the timer
sdkHarnessRunner.setCurrentTimerKey(decodedKey);
- // We have to synchronize to ensure the state backend is not concurrently accessed by the state requests
+ // We have to synchronize to ensure the state backend is not concurrently accessed by the state
+ // requests
try {
stateBackendLock.lock();
getKeyedStateBackend().setCurrentKey(encodedKey);
@@ -399,9 +401,10 @@ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
public void dispose() throws Exception {
// may be called multiple times when an exception is thrown
if (stageContext != null) {
- // Remove the reference to stageContext and make stageContext available for garbage collection.
+ // Remove the reference to stageContext and make stageContext available for garbage
+ // collection.
try (AutoCloseable bundleFactoryCloser = stageBundleFactory;
- AutoCloseable closable = stageContext) {
+ AutoCloseable closable = stageContext) {
// DoFnOperator generates another "bundle" for the final watermark
// https://issues.apache.org/jira/browse/BEAM-5816
super.dispose();
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
index a1ccca2fd2e0..ede1b5cb2b57 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapperTest.java
@@ -303,9 +303,7 @@ public void close() {}
sourceThread.start();
- while (flinkWrapper
- .getLocalReaders()
- .stream()
+ while (flinkWrapper.getLocalReaders().stream()
.anyMatch(reader -> reader.getWatermark().getMillis() == 0)) {
// readers haven't been initialized
Thread.sleep(50);
@@ -631,7 +629,8 @@ private static void testSourceDoesNotShutdown(boolean shouldHaveReaders) throws
SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class);
Object checkpointLock = new Object();
Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock);
- // Initialize source context early to avoid concurrency issues with its initialization in the run
+ // Initialize source context early to avoid concurrency issues with its initialization in the
+ // run
// method and the onProcessingTime call on the wrapper.
sourceWrapper.setSourceContext(sourceContext);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index ffe92fa5d05f..ff0590b112a4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -69,7 +69,8 @@
*/
public static <K, InputT, OutputT>
PTransformOverrideFactory<
- PCollection<KV<K, InputT>>, PCollection<OutputT>,
+ PCollection<KV<K, InputT>>,
+ PCollection<OutputT>,
ParDo.SingleOutput<KV<K, InputT>, OutputT>>
singleOutputOverrideFactory(DataflowPipelineOptions options) {
return new SingleOutputOverrideFactory<>(isFnApi(options));
@@ -81,7 +82,8 @@
*/
public static <K, InputT, OutputT>
PTransformOverrideFactory<
- PCollection<KV<K, InputT>>, PCollectionTuple,
+ PCollection<KV<K, InputT>>,
+ PCollectionTuple,
ParDo.MultiOutput<KV<K, InputT>, OutputT>>
multiOutputOverrideFactory(DataflowPipelineOptions options) {
return new MultiOutputOverrideFactory<>(isFnApi(options));
@@ -94,7 +96,8 @@ private static boolean isFnApi(DataflowPipelineOptions options) {
private static class SingleOutputOverrideFactory<K, InputT, OutputT>
implements PTransformOverrideFactory<
- PCollection<KV<K, InputT>>, PCollection<OutputT>,
+ PCollection<KV<K, InputT>>,
+ PCollection<OutputT>,
ParDo.SingleOutput<KV<K, InputT>, OutputT>> {
private final boolean isFnApi;
@@ -107,7 +110,8 @@ private SingleOutputOverrideFactory(boolean isFnApi) {
public PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<OutputT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, InputT>>, PCollection<OutputT>,
+ PCollection<KV<K, InputT>>,
+ PCollection<OutputT>,
SingleOutput<KV<K, InputT>, OutputT>>
transform) {
return PTransformReplacement.of(
@@ -136,7 +140,8 @@ private MultiOutputOverrideFactory(boolean isFnApi) {
public PTransformReplacement<PCollection<KV<K, InputT>>, PCollectionTuple>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, InputT>>, PCollectionTuple,
+ PCollection<KV<K, InputT>>,
+ PCollectionTuple,
MultiOutput<KV<K, InputT>, OutputT>>
transform) {
return PTransformReplacement.of(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0db5892c656d..e93a15c893e0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -882,10 +882,7 @@ public void translate(ParDo.MultiOutput transform, TranslationContext context) {
ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
Map<TupleTag<?>, Coder<?>> outputCoders =
- context
- .getOutputs(transform)
- .entrySet()
- .stream()
+ context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
@@ -920,10 +917,7 @@ public void translate(ParDoSingle transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
Map<TupleTag<?>, Coder<?>> outputCoders =
- context
- .getOutputs(transform)
- .entrySet()
- .stream()
+ context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
@@ -991,10 +985,7 @@ public void translate(
StepTranslationContext stepContext =
context.addStep(transform, "SplittableProcessKeyed");
Map<TupleTag<?>, Coder<?>> outputCoders =
- context
- .getOutputs(transform)
- .entrySet()
- .stream()
+ context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 774614180783..1a7ae3c28fcb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -788,8 +788,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) {
List<String> minCpuFlags =
- experiments
- .stream()
+ experiments.stream()
.filter(p -> p.startsWith("min_cpu_platform"))
.collect(Collectors.toList());
@@ -1677,13 +1676,15 @@ private String getJobIdFromName(String jobName) {
private static class PrimitiveCombineGroupedValuesOverrideFactory<K, InputT, OutputT>
implements PTransformOverrideFactory<
- PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, Iterable<InputT>>>,
+ PCollection<KV<K, OutputT>>,
Combine.GroupedValues<K, InputT, OutputT>> {
@Override
public PTransformReplacement<PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, Iterable<InputT>>>, PCollection<KV<K, OutputT>>,
+ PCollection<KV<K, Iterable<InputT>>>,
+ PCollection<KV<K, OutputT>>,
GroupedValues<K, InputT, OutputT>>
transform) {
return PTransformReplacement.of(
@@ -1726,7 +1727,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
@VisibleForTesting
static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
implements PTransformOverrideFactory<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>> {
// We pick 10 as a a default, as it works well with the default number of workers started
// by Dataflow.
@@ -1741,7 +1743,8 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
public PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<UserT>, WriteFilesResult<DestinationT>,
+ PCollection<UserT>,
+ WriteFilesResult<DestinationT>,
WriteFiles<UserT, DestinationT, OutputT>>
transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 29c1699c75ed..d9758c625aee 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -66,7 +66,8 @@
public PTransformReplacement<PCollection<? extends InputT>, PCollection<OutputT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<? extends InputT>, PCollection<OutputT>,
+ PCollection<? extends InputT>,
+ PCollection<OutputT>,
SingleOutput<InputT, OutputT>>
transform) {
return PTransformReplacement.of(
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index ef238b26cb2f..40a05f3f9330 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -49,7 +49,7 @@
private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
- int cores = 4; //TODO: decide at runtime?
+ int cores = 4; // TODO: decide at runtime?
if (options.getMaxNumWorkers() > 0) {
return options.getMaxNumWorkers() * cores;
} else if (options.getNumWorkers() > 0) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
index 4d17b6f74d60..6b62fac6094c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java
@@ -75,7 +75,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) {
getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setApplicationName(options.getAppName())
.setRootUrl(components.rootUrl)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7e9a1d2f32e8..e52c021884f9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -339,8 +339,7 @@ public Node typedApply(ExecutableStageNode input) {
Iterables.filter(network.successors(input), OutputReceiverNode.class);
Map<String, OutputReceiver> outputReceiverMap = new HashMap<>();
- Lists.newArrayList(outputReceiverNodes)
- .stream()
+ Lists.newArrayList(outputReceiverNodes).stream()
.forEach(
outputReceiverNode ->
outputReceiverMap.put(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index 392941f8bf05..aefffee3f301 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -74,6 +74,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* A {@link WorkExecutor} that processes a list of {@link Operation}s.
*
@@ -489,8 +490,7 @@ private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
bundleProcessOperation.getPtransformIdToUserStepContext());
counterUpdates =
- monitoringInfos
- .stream()
+ monitoringInfos.stream()
.map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate)
.filter(Objects::nonNull)
.collect(Collectors.toList());
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index dfb2047a5764..73b995bf754f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -236,7 +236,8 @@ public Node apply(MutableNetwork<Node, Edge> input) {
nodesToPCollections.put(node, pcollectionId);
componentsBuilder.putPcollections(pcollectionId, pCollection);
- // Check whether this output collection has consumers from worker side when "use_executable_stage_bundle_execution"
+ // Check whether this output collection has consumers from worker side when
+ // "use_executable_stage_bundle_execution"
// is set
if (input.successors(node).stream().anyMatch(RemoteGrpcPortNode.class::isInstance)) {
executableStageOutputs.add(PipelineNode.pCollection(pcollectionId, pCollection));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
index b2913c92962e..9ad594ec65bf 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/status/ThreadzServlet.java
@@ -109,9 +109,7 @@ public void captureData(PrintWriter writer) {
// Then, print out each stack along with the threads that share it. Stacks with more threads
// are printed first.
- stacks
- .entrySet()
- .stream()
+ stacks.entrySet().stream()
.sorted(Comparator.comparingInt(e -> -e.getValue().size()))
.forEachOrdered(
entry -> {
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
index cf82391b5317..79d92ba8378f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NetworksTest.java
@@ -150,9 +150,7 @@ public String apply(String input) {
MutableNetwork<String, String> originalNetwork = createNetwork();
for (String node : originalNetwork.nodes()) {
assertEquals(
- originalNetwork
- .successors(node)
- .stream()
+ originalNetwork.successors(node).stream()
.map(function)
.collect(Collectors.toCollection(HashSet::new)),
network.successors(function.apply(node)));
@@ -161,9 +159,7 @@ public String apply(String input) {
}
assertEquals(
network.nodes(),
- originalNetwork
- .nodes()
- .stream()
+ originalNetwork.nodes().stream()
.map(function)
.collect(Collectors.toCollection(HashSet::new)));
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 1b2d16187307..967f8fc0f808 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -45,8 +45,7 @@ public Server allocateAddressAndCreate(
String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement());
builder.setUrl(name);
InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(name);
- services
- .stream()
+ services.stream()
.forEach(
service ->
serverBuilder.addService(
@@ -59,8 +58,7 @@ public Server allocateAddressAndCreate(
public Server create(List<BindableService> services, ApiServiceDescriptor serviceDescriptor)
throws IOException {
InProcessServerBuilder builder = InProcessServerBuilder.forName(serviceDescriptor.getUrl());
- services
- .stream()
+ services.stream()
.forEach(
service ->
builder.addService(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 38cae631a975..298f054921e4 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -150,8 +150,7 @@ private static Server createServer(List<BindableService> services, InetSocketAdd
// buffer size in the layers above.
.maxMessageSize(Integer.MAX_VALUE)
.permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
- services
- .stream()
+ services.stream()
.forEach(
service ->
builder.addService(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
index 1ffe4390e74c..0d8372725928 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -104,9 +104,7 @@ public void getArtifact(
ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(request.getRetrievalToken());
// look for file at URI specified by proxy manifest location
ArtifactApi.ProxyManifest.Location location =
- proxyManifest
- .getLocationList()
- .stream()
+ proxyManifest.getLocationList().stream()
.filter(loc -> loc.getName().equals(name))
.findFirst()
.orElseThrow(
@@ -117,8 +115,7 @@ public void getArtifact(
List<ArtifactMetadata> existingArtifacts = proxyManifest.getManifest().getArtifactList();
ArtifactMetadata metadata =
- existingArtifacts
- .stream()
+ existingArtifacts.stream()
.filter(meta -> meta.getName().equals(name))
.findFirst()
.orElseThrow(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index e477cd9f7f54..04a883dc4603 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -110,9 +110,7 @@ private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(
// Create with all of the processing transforms, and all of the components.
// TODO: Remove the unreachable subcomponents if the size of the descriptor matters.
Map<String, PTransform> stageTransforms =
- stage
- .getTransforms()
- .stream()
+ stage.getTransforms().stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform));
Components.Builder components =
@@ -388,7 +386,8 @@ private static TargetEncoding addStageOutput(
outputTargetCodersBuilder.put(targetEncoding.getTarget(), targetEncoding.getCoder());
components.putTransforms(
timerReference.transform().getId(),
- // Since a transform can have more then one timer, update the transform inside components and not the original
+ // Since a transform can have more then one timer, update the transform inside components
+ // and not the original
components
.getTransformsOrThrow(timerReference.transform().getId())
.toBuilder()
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index fb858dc784c6..94834fcbb471 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -128,7 +128,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
.add("--network=host")
- // We need to pass on the information about Docker-on-Mac environment (due to missing host networking on Mac)
+ // We need to pass on the information about Docker-on-Mac environment (due to missing
+ // host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
if (!retainDockerContainer) {
@@ -206,7 +207,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
* likely only support the latest version at any time.
*/
private static class DockerOnMac {
- // TODO: This host name seems to change with every other Docker release. Do we attempt to keep up
+ // TODO: This host name seems to change with every other Docker release. Do we attempt to keep
+ // up
// or attempt to document the supported Docker version(s)?
private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
@@ -280,7 +282,8 @@ private static Platform getPlatform() {
String osName = System.getProperty("os.name").toLowerCase();
// TODO: Make this more robust?
// The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
- // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux.
+ // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable
+ // from Linux.
// We still need to apply port mapping due to missing host networking.
if (osName.startsWith("mac") || DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) {
return Platform.MAC;
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 259a69394d8f..05dd45b1f451 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -445,9 +445,7 @@ private void assertFiles(Set<String> files, String retrievalToken) throws Except
Assert.assertEquals(
"Files in locations does not match actual file list.",
files,
- proxyManifest
- .getLocationList()
- .stream()
+ proxyManifest.getLocationList().stream()
.map(Location::getName)
.collect(Collectors.toSet()));
Assert.assertEquals(
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
index 958d242a7567..75d2bb675c9f 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactoryTest.java
@@ -103,9 +103,7 @@ public void closeShutsDownEnvironments() throws Exception {
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
ExecutableStage stage =
- GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
- .getFusedStages()
- .stream()
+ GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
.findFirst()
.get();
RemoteEnvironment remoteEnv = mock(RemoteEnvironment.class);
@@ -124,9 +122,7 @@ public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception {
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
ExecutableStage firstEnvStage =
- GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
- .getFusedStages()
- .stream()
+ GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).getFusedStages().stream()
.findFirst()
.get();
ExecutableStagePayload basePayload =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index 7c5e9f9e7a0b..78a88fbe01b0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -105,8 +105,7 @@ public Admin(BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) {
@Override
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
- return streamNames
- .stream()
+ return streamNames.stream()
.collect(
Collectors.toMap(
Function.<String>identity(),
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index 8defed92a7b1..88affe4c0d4d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -118,8 +118,7 @@ public Admin(UnboundedSource<T, CheckpointMarkT> source, SamzaPipelineOptions pi
@Override
public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
- return streamNames
- .stream()
+ return streamNames.stream()
.collect(
Collectors.toMap(
Function.<String>identity(),
@@ -313,7 +312,7 @@ public void run() {
updateWatermark();
if (!elementAvailable) {
- //TODO: make poll interval configurable
+ // TODO: make poll interval configurable
Thread.sleep(50);
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
index 25ebe609743f..0b00e89dac91 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java
@@ -69,7 +69,7 @@ public void updateMetrics() {
final GaugeUpdater updateGauge = new GaugeUpdater();
results.getGauges().forEach(updateGauge);
- //TODO: add distribution metrics to Samza
+ // TODO: add distribution metrics to Samza
}
private class CounterUpdater implements Consumer<MetricResult<Long>> {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 1d2907447d8a..401cd117b949 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -82,9 +82,8 @@
// This is derivable from pushbackValues which is persisted to a store.
// TODO: eagerly initialize the hold in init
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
- justification = "No bug",
- value = "SE_TRANSIENT_FIELD_NOT_RESTORED"
- )
+ justification = "No bug",
+ value = "SE_TRANSIENT_FIELD_NOT_RESTORED")
private transient Instant pushbackWatermarkHold;
// TODO: add this to checkpointable state
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 67c343cbb78f..94e319cd916a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -536,7 +536,7 @@ public void closeIterators() {
private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
- private static final int MAX_KEY_SIZE = 100000; //100K bytes
+ private static final int MAX_KEY_SIZE = 100000; // 100K bytes
private final Coder<KeyT> keyCoder;
private final byte[] maxKey;
private final int storeKeySize;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
index b9692a221a49..48fb96917cb3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/WindowAssignOp.java
@@ -39,8 +39,7 @@ public void processElement(WindowedValue<T> inputElement, OpEmitter<T> emitter)
throw new RuntimeException(e);
}
- windows
- .stream()
+ windows.stream()
.map(
window ->
WindowedValue.of(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index c144428a13ef..7e7457dfc604 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -71,10 +71,7 @@ public void translate(
TranslationContext ctx) {
final PCollection<? extends InT> input = ctx.getInput(transform);
final Map<TupleTag<?>, Coder<?>> outputCoders =
- ctx.getCurrentTransform()
- .getOutputs()
- .entrySet()
- .stream()
+ ctx.getCurrentTransform().getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(
Collectors.toMap(e -> e.getKey(), e -> ((PCollection<?>) e.getValue()).getCoder()));
@@ -93,9 +90,7 @@ public void translate(
final MessageStream<OpMessage<InT>> inputStream = ctx.getMessageStream(input);
final List<MessageStream<OpMessage<InT>>> sideInputStreams =
- transform
- .getSideInputs()
- .stream()
+ transform.getSideInputs().stream()
.map(ctx::<InT>getViewStream)
.collect(Collectors.toList());
final Map<TupleTag<?>, Integer> tagToIdMap = new HashMap<>();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
index 82e772ab5441..928d2887eec2 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestBoundedSource.java
@@ -52,8 +52,7 @@ private TestBoundedSource(List<List<Event<T>>> events) {
@Override
public List<? extends BoundedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- return events
- .stream()
+ return events.stream()
.map(ev -> new TestBoundedSource<>(Collections.singletonList(ev)))
.collect(Collectors.toList());
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
index 0d0710e044d3..d526d859dea4 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestCheckpointMark.java
@@ -31,7 +31,7 @@ private TestCheckpointMark(int checkpoint) {
@Override
public void finalizeCheckpoint() throws IOException {
- //DO NOTHING
+ // DO NOTHING
}
static TestCheckpointMark of(int checkpoint) {
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
index a41adbc4e891..59845e0ffacb 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/TestUnboundedSource.java
@@ -62,8 +62,7 @@ private TestUnboundedSource(List<List<Event<T>>> events) {
@Override
public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
- return events
- .stream()
+ return events.stream()
.map(ev -> new TestUnboundedSource<>(Collections.singletonList(ev)))
.collect(Collectors.toList());
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 49a4c1aa6fe2..3ed6480a8eef 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -77,8 +77,7 @@ private boolean knownComposite(Class<PTransform<?, ?>> transform) {
private boolean shouldDebug(final TransformHierarchy.Node node) {
return node == null
- || (!transforms
- .stream()
+ || (!transforms.stream()
.anyMatch(
debugTransform ->
debugTransform.getNode().equals(node) && debugTransform.isComposite())
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index 5e73f52a692d..8acf897884e2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -98,8 +98,7 @@ private CoderHelpers() {}
*/
public static <T> Iterable<T> fromByteArrays(
Collection<byte[]> serialized, final Coder<T> coder) {
- return serialized
- .stream()
+ return serialized.stream()
.map(bytes -> fromByteArray(checkNotNull(bytes, "Cannot decode null values."), coder))
.collect(Collectors.toList());
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 9d207550684f..ce460a3d848a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -85,7 +85,7 @@
*
* @param <T> The type of the element in this stream.
*/
-//TODO: write a proper Builder enforcing all those rules mentioned.
+// TODO: write a proper Builder enforcing all those rules mentioned.
public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
public static final String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
@@ -96,7 +96,7 @@
private Instant initialSystemTime;
private final boolean forceWatermarkSync;
- private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes.
+ private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; // for test purposes.
private CreateStream(
Duration batchDuration,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index b327a9b25fc3..abc8244e10bf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -167,7 +167,7 @@ int getNumPartitions() {
return numPartitions;
}
- //---- Bound by time.
+ // ---- Bound by time.
// return the largest between the proportional read time (%batchDuration dedicated for read)
// and the min. read time set.
@@ -184,7 +184,7 @@ private Duration boundReadDuration(double readTimePercentage, long minReadTimeMi
return readDuration;
}
- //---- Bound by records.
+ // ---- Bound by records.
private scala.Option<Long> rateControlledMaxRecords() {
final scala.Option<RateController> rateControllerOption = rateController();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index d88e2b8f044e..e61a088a59c3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -99,7 +99,7 @@
/** State and Timers wrapper. */
public static class StateAndTimers implements Serializable {
- //Serializable state for internals (namespace to state tag to coded value).
+ // Serializable state for internals (namespace to state tag to coded value).
private final Table<String, String, byte[]> state;
private final Collection<byte[]> serTimers;
@@ -151,7 +151,8 @@ public void outputWindowedValue(
extends AbstractFunction1<
Iterator<
Tuple3<
- /*K*/ ByteArray, Seq</*Itr<WV<I>>*/ byte[]>,
+ /*K*/ ByteArray,
+ Seq</*Itr<WV<I>>*/ byte[]>,
Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>,
Iterator<
Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>
@@ -412,10 +413,11 @@ public void outputWindowedValue(
apply(
final Iterator<
Tuple3<
- /*K*/ ByteArray, Seq</*Itr<WV<I>>*/ byte[]>,
+ /*K*/ ByteArray,
+ Seq</*Itr<WV<I>>*/ byte[]>,
Option<Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>>
input) {
- //--- ACTUAL STATEFUL OPERATION:
+ // --- ACTUAL STATEFUL OPERATION:
//
// Input Iterator: the partition (~bundle) of a co-grouping of the input
// and the previous state (if exists).
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
index 8cb0c0af0a7b..41dbbfa3ce86 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkStateInternals.java
@@ -49,7 +49,7 @@
class SparkStateInternals<K> implements StateInternals {
private final K key;
- //Serializable state for internals (namespace to state tag to coded value).
+ // Serializable state for internals (namespace to state tag to coded value).
private final Table<String, String, byte[]> stateTable;
private SparkStateInternals(K key) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 2d21ba566a28..bafe6bf69765 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -93,12 +93,16 @@
*/
public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
scala.Function3<
- Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
+ Source<T>,
+ Option<CheckpointMarkT>,
+ State<Tuple2<byte[], Instant>>,
Tuple2<Iterable<byte[]>, Metadata>>
mapSourceFunction(final SerializablePipelineOptions options, final String stepName) {
return new SerializableFunction3<
- Source<T>, Option<CheckpointMarkT>, State<Tuple2<byte[], Instant>>,
+ Source<T>,
+ Option<CheckpointMarkT>,
+ State<Tuple2<byte[], Instant>>,
Tuple2<Iterable<byte[]>, Metadata>>() {
@Override
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 33e12e1da37e..c860906e2a8e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -83,8 +83,7 @@
JavaRDDLike<byte[], ?> bytesRDD = rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
List<byte[]> clientBytes = bytesRDD.collect();
windowedValues =
- clientBytes
- .stream()
+ clientBytes.stream()
.map(bytes -> CoderHelpers.fromByteArray(bytes, windowedValueCoder))
.collect(Collectors.toList());
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index a1151c662d5b..88f706f256c2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -131,10 +131,7 @@ public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
}
public Map<TupleTag<?>, Coder<?>> getOutputCoders() {
- return currentTransform
- .getOutputs()
- .entrySet()
- .stream()
+ return currentTransform.getOutputs().entrySet().stream()
.filter(e -> e.getValue() instanceof PCollection)
.collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
index 67672a2ed95e..8336350cb43f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java
@@ -113,7 +113,7 @@ public PipelineOptions getPipelineOptions() {
@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(input, "Input in SparkCombineContext must not be null!");
- //validate element window.
+ // validate element window.
final Collection<? extends BoundedWindow> elementWindows = input.getWindows();
checkState(
elementWindows.size() == 1,
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
index b9227f40206c..d5326d076021 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGlobalCombineFn.java
@@ -71,7 +71,7 @@ public SparkGlobalCombineFn(
TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
- //--- inputs iterator, by window order.
+ // --- inputs iterator, by window order.
final Iterator<WindowedValue<InputT>> iterator = sortedInputs.iterator();
WindowedValue<InputT> currentInput = iterator.next();
BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
@@ -170,7 +170,7 @@ public SparkGlobalCombineFn(
@SuppressWarnings("unchecked")
TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
- //--- accumulators iterator, by window order.
+ // --- accumulators iterator, by window order.
final Iterator<WindowedValue<AccumT>> iterator = sortedAccumulators.iterator();
// get the first accumulator and assign it to the current window's accumulators.
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 83189a750532..d66467091652 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -74,7 +74,7 @@ public SparkGroupAlsoByWindowViaOutputBufferFn(
K key = windowedValue.getValue().getKey();
Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue();
- //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
+ // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
// Used with Batch, we know that all the data is available for this key. We can't use the
// timer manager from the context because it doesn't exist. So we create one and emulate the
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
index 81541276512b..879786ddb9e3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java
@@ -72,7 +72,7 @@ public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
WindowFn<?, BoundedWindow> windowFn = windowingStrategy.getWindowFn();
- //--- inputs iterator, by window order.
+ // --- inputs iterator, by window order.
final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInputs.iterator();
WindowedValue<KV<K, InputT>> currentInput = iterator.next();
BoundedWindow currentWindow = Iterables.getFirst(currentInput.getWindows(), null);
@@ -181,7 +181,7 @@ public OutputT apply(WindowedValue<KV<K, Iterable<InputT>>> windowedKv) {
@SuppressWarnings("unchecked")
TimestampCombiner timestampCombiner = windowingStrategy.getTimestampCombiner();
- //--- accumulators iterator, by window order.
+ // --- accumulators iterator, by window order.
final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedAccumulators.iterator();
// get the first accumulator and assign it to the current window's accumulators.
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
index b64587ded403..163ba55e6d1b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPCollectionView.java
@@ -68,7 +68,7 @@ SideInputBroadcast getPCollectionView(PCollectionView<?> view, JavaSparkContext
}
}
- //lazily broadcast views
+ // lazily broadcast views
SideInputBroadcast helper = broadcastHelperMap.get(view);
if (helper == null) {
synchronized (SparkPCollectionView.class) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index fc5c5a6baf3f..e654aebf390b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -80,7 +80,7 @@ public JavaStreamingContext call() throws Exception {
// We must first init accumulators since translators expect them to be instantiated.
SparkRunner.initAccumulators(options, jsc);
- //do not need to create a MetricsPusher instance here because if is called in SparkRunner.run()
+ // do not need to create a MetricsPusher instance here because if is called in SparkRunner.run()
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
// update cache candidates
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index debab3427daf..f9253dcfbed7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -49,16 +49,16 @@ public SparkSideInputReader(
@Nullable
@Override
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
- //--- validate sideInput.
+ // --- validate sideInput.
checkNotNull(view, "The PCollectionView passed to sideInput cannot be null ");
KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>> windowedBroadcastHelper =
sideInputs.get(view.getTagInternal());
checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available.");
- //--- sideInput window
+ // --- sideInput window
final BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(window);
- //--- match the appropriate sideInput window.
+ // --- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
index d166d8dabbba..7b72b546e2ef 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -46,16 +46,10 @@ public InMemoryMetrics(
// this might fail in case we have multiple aggregators with the same suffix after
// the last dot, but it should be good enough for tests.
if (extendedMetricsRegistry != null
- && extendedMetricsRegistry
- .getGauges()
- .keySet()
- .stream()
+ && extendedMetricsRegistry.getGauges().keySet().stream()
.anyMatch(Predicates.containsPattern(name + "$")::apply)) {
String key =
- extendedMetricsRegistry
- .getGauges()
- .keySet()
- .stream()
+ extendedMetricsRegistry.getGauges().keySet().stream()
.filter(Predicates.containsPattern(name + "$")::apply)
.findFirst()
.get();
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index ba204a97f0d0..ceb87a17b30b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -133,9 +133,8 @@ private static void produce(Map<String, Instant> messages) {
Serializer<String> stringSerializer = new StringSerializer();
Serializer<Instant> instantSerializer = new InstantSerializer();
- try (
- KafkaProducer<String, Instant> kafkaProducer =
- new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
+ try (KafkaProducer<String, Instant> kafkaProducer =
+ new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
for (Map.Entry<String, Instant> en : messages.entrySet()) {
kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
}
@@ -179,12 +178,12 @@ public void testWithResume() throws Exception {
"EOFShallNotPassFn",
4L)));
- //--- between executions:
+ // --- between executions:
- //- clear state.
+ // - clear state.
clean();
- //- write a bit more.
+ // - write a bit more.
produce(
ImmutableMap.of(
"k5", new Instant(499),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 05bdee080ea1..77dedb33511d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -40,7 +40,8 @@
})
/**
- * You can indicate a category for the experimental feature. This is unused and serves only as a hint to the reader.
+ * You can indicate a category for the experimental feature. This is unused and serves only as a
+ * hint to the reader.
*/
@Documented
public @interface Experimental {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index ad20d63c86ac..bdb971b06143 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -126,9 +126,7 @@ private void verifyDeterministic(Schema schema)
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
List<Coder<?>> coders =
- schema
- .getFields()
- .stream()
+ schema.getFields().stream()
.map(Field::getType)
.map(RowCoder::coderForFieldType)
.collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index fb786f5caa55..1a5459229d79 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -944,8 +944,7 @@ private CreateTextSourceFn(byte[] delimiter) {
checkArgument(
1
== Iterables.size(
- allToArgs
- .stream()
+ allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index b712ab46f917..1629b7a3283a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -354,8 +354,7 @@ static boolean printHelpUsageAndExitIfNeeded(
} catch (ClassNotFoundException e) {
// If we didn't find an exact match, look for any that match the class name.
Iterable<Class<? extends PipelineOptions>> matches =
- getRegisteredOptions()
- .stream()
+ getRegisteredOptions().stream()
.filter(
input -> {
if (helpOption.contains(".")) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 9b76a4cf4d8d..df2df863b24d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -746,8 +746,7 @@ public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
if (rawOptionsNode != null && !rawOptionsNode.isNull()) {
ObjectNode optionsNode = (ObjectNode) rawOptionsNode;
for (Iterator<Map.Entry<String, JsonNode>> iterator = optionsNode.fields();
- iterator != null && iterator.hasNext();
- ) {
+ iterator != null && iterator.hasNext(); ) {
Map.Entry<String, JsonNode> field = iterator.next();
fields.put(field.getKey(), field.getValue());
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
index 4df0a9313ca2..91659c814f4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
@@ -42,8 +42,7 @@
public List<FieldValueTypeInformation> get(Class<?> clazz) {
// If the generated class is passed in, we want to look at the base class to find the getters.
Class<?> targetClass = AutoValueUtils.getBaseAutoValueClass(clazz);
- return ReflectUtils.getMethods(targetClass)
- .stream()
+ return ReflectUtils.getMethods(targetClass).stream()
.filter(ReflectUtils::isGetter)
// All AutoValue getters are marked abstract.
.filter(m -> Modifier.isAbstract(m.getModifiers()))
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index d2ee4fdec11d..d54aaab25680 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -263,17 +263,13 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper(
}
nestedFields.putAll(
- getNestedFieldsAccessedByName()
- .entrySet()
- .stream()
+ getNestedFieldsAccessedByName().entrySet().stream()
.collect(
Collectors.toMap(
e -> schema.indexOf(e.getKey()),
e -> resolvedNestedFieldsHelper(schema.getField(e.getKey()), e.getValue()))));
nestedFields.putAll(
- getNestedFieldsAccessedById()
- .entrySet()
- .stream()
+ getNestedFieldsAccessedById().entrySet().stream()
.collect(
Collectors.toMap(
e -> validateFieldId(schema, e.getKey()),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
index 5ff50460b800..881b1cec75f1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldTypeDescriptors.java
@@ -30,6 +30,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiMap;
import org.joda.time.Instant;
+
/**
* Utilities for converting between {@link Schema} field types and {@link TypeDescriptor}s that
* define Java objects which can represent these field types.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 61fca2ecb262..176f97dc1f14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -53,8 +53,7 @@
@Override
public List<FieldValueTypeInformation> get(Class<?> clazz) {
- return ReflectUtils.getMethods(clazz)
- .stream()
+ return ReflectUtils.getMethods(clazz).stream()
.filter(ReflectUtils::isGetter)
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.map(FieldValueTypeInformation::forGetter)
@@ -74,8 +73,7 @@
@Override
public List<FieldValueTypeInformation> get(Class<?> clazz) {
- return ReflectUtils.getMethods(clazz)
- .stream()
+ return ReflectUtils.getMethods(clazz).stream()
.filter(ReflectUtils::isSetter)
.filter(m -> !m.isAnnotationPresent(SchemaIgnore.class))
.map(FieldValueTypeInformation::forSetter)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 88986d4875c4..cba5448ca22b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -57,8 +57,7 @@
@Override
public List<FieldValueTypeInformation> get(Class<?> clazz) {
List<FieldValueTypeInformation> types =
- ReflectUtils.getFields(clazz)
- .stream()
+ ReflectUtils.getFields(clazz).stream()
.filter(f -> !f.isAnnotationPresent(SchemaIgnore.class))
.map(FieldValueTypeInformation::forField)
.map(
@@ -73,8 +72,7 @@
if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null
&& ReflectUtils.getAnnotatedConstructor(clazz) == null) {
Optional<Field> finalField =
- types
- .stream()
+ types.stream()
.map(FieldValueTypeInformation::getField)
.filter(f -> Modifier.isFinal(f.getModifiers()))
.findAny();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 5592d1385beb..84e89d6be55a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -256,14 +256,11 @@ private boolean equivalent(Schema other, EquivalenceNullablePolicy nullablePolic
}
List<Field> otherFields =
- other
- .getFields()
- .stream()
+ other.getFields().stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Collectors.toList());
List<Field> actualFields =
- getFields()
- .stream()
+ getFields().stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
index 1059aa16775e..024131efe44c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Cast.java
@@ -272,8 +272,7 @@ public void verifyCompatibility(Schema inputSchema) {
if (!errors.isEmpty()) {
String reason =
- errors
- .stream()
+ errors.stream()
.map(x -> Joiner.on('.').join(x.path()) + ": " + x.message())
.collect(Collectors.joining("\n\t"));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index 5804d4116248..1ee6fadf756c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -242,10 +242,7 @@ private FieldAccessDescriptor getFieldAccessDescriptor(TupleTag<?> tag) {
KeyedPCollectionTuple<Row> keyedPCollectionTuple =
KeyedPCollectionTuple.empty(input.getPipeline());
List<TupleTag<Row>> sortedTags =
- input
- .getAll()
- .keySet()
- .stream()
+ input.getAll().keySet().stream()
.sorted(Comparator.comparing(TupleTag::getId))
.map(t -> new TupleTag<Row>(t.getId() + "_ROW"))
.collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
index c7716855efd9..5f5626110226 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Filter.java
@@ -125,9 +125,7 @@
for (String fieldName :
Sets.union(
fieldNameFilters.keySet(),
- fieldNamesFilters
- .keySet()
- .stream()
+ fieldNamesFilters.keySet().stream()
.flatMap(List::stream)
.collect(Collectors.toSet()))) {
schema.getField(fieldName);
@@ -135,9 +133,7 @@
for (int fieldIndex :
Sets.union(
fieldIdFilters.keySet(),
- fieldIdsFilters
- .keySet()
- .stream()
+ fieldIdsFilters.keySet().stream()
.flatMap(List::stream)
.collect(Collectors.toSet()))) {
if (fieldIndex >= schema.getFieldCount() || fieldIndex < 0) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
index 2fe8698740fb..6462e980026e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaAggregateFn.java
@@ -145,8 +145,7 @@
/** Once the schema is known, this function is called by the {@link Group} transform. */
Inner<T> withSchema(Schema inputSchema, SerializableFunction<T, Row> toRowFunction) {
List<FieldAggregation> fieldAggregations =
- getFieldAggregations()
- .stream()
+ getFieldAggregations().stream()
.map(f -> f.resolve(inputSchema))
.collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
index 65f398b37ba2..a94e77e3ef35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
@@ -30,6 +30,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+
/**
* A {@link PTransform} to unnest nested rows.
*
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
index c5c29418ba53..0dec26095343 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
@@ -125,8 +125,7 @@ private static boolean matchConstructor(
}
Map<String, FieldValueTypeInformation> typeMap =
- getterTypes
- .stream()
+ getterTypes.stream()
.collect(
Collectors.toMap(
f -> ReflectUtils.stripGetterPrefix(f.getMethod().getName()),
@@ -151,8 +150,7 @@ public static SchemaUserTypeCreator getBuilderCreator(
}
Map<String, FieldValueTypeInformation> setterTypes =
- ReflectUtils.getMethods(builderClass)
- .stream()
+ ReflectUtils.getMethods(builderClass).stream()
.filter(ReflectUtils::isSetter)
.map(FieldValueTypeInformation::forSetter)
.collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
@@ -176,8 +174,7 @@ public static SchemaUserTypeCreator getBuilderCreator(
}
Method buildMethod =
- ReflectUtils.getMethods(builderClass)
- .stream()
+ ReflectUtils.getMethods(builderClass).stream()
.filter(m -> m.getName().equals("build"))
.findAny()
.orElseThrow(() -> new RuntimeException("No build method in builder"));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 84278bef0789..39d6323209f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -88,8 +88,7 @@
// don't need recursion because nested unions aren't supported in AVRO
List<org.apache.avro.Schema> nonNullTypes =
- types
- .stream()
+ types.stream()
.filter(x -> x.getType() != org.apache.avro.Schema.Type.NULL)
.collect(Collectors.toList());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index a435d45e4fb9..8aaa200ba4e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -68,8 +68,7 @@ public static Schema schemaFromJavaBeanClass(
public static void validateJavaBean(
List<FieldValueTypeInformation> getters, List<FieldValueTypeInformation> setters) {
Map<String, FieldValueTypeInformation> setterMap =
- setters
- .stream()
+ setters.stream()
.collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
for (FieldValueTypeInformation type : getters) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index 5d62e82dec27..073ead1e0bd1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -39,12 +39,9 @@
public static List<FieldValueTypeInformation> sortBySchema(
List<FieldValueTypeInformation> types, Schema schema) {
Map<String, FieldValueTypeInformation> typeMap =
- types
- .stream()
+ types.stream()
.collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity()));
- return schema
- .getFields()
- .stream()
+ return schema.getFields().stream()
.map(f -> typeMap.get(f.getName()))
.collect(Collectors.toList());
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 2cd71f5907d8..3f8b22be7358 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -332,8 +332,7 @@ public void finishBundle() throws Exception {
/** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */
@Deprecated
public List<OutputT> peekOutputElements() {
- return peekOutputElementsWithTimestamp()
- .stream()
+ return peekOutputElementsWithTimestamp().stream()
.map(TimestampedValue::getValue)
.collect(Collectors.toList());
}
@@ -342,8 +341,7 @@ public void finishBundle() throws Exception {
@Deprecated
public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
// TODO: Should we return an unmodifiable list?
- return getImmutableOutput(mainOutputTag)
- .stream()
+ return getImmutableOutput(mainOutputTag).stream()
.map(input -> TimestampedValue.of(input.getValue(), input.getTimestamp()))
.collect(Collectors.toList());
}
@@ -394,8 +392,7 @@ public void clearOutputElements() {
@Deprecated
public <T> List<T> peekOutputElements(TupleTag<T> tag) {
// TODO: Should we return an unmodifiable list?
- return getImmutableOutput(tag)
- .stream()
+ return getImmutableOutput(tag).stream()
.map(ValueInSingleWindow::getValue)
.collect(Collectors.toList());
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 19ff713ef0e8..8c2a11bdaa9e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -173,7 +173,7 @@ public void processElement(
numElementsInBatch.add(1L);
Long num = numElementsInBatch.read();
if (num % prefetchFrequency == 0) {
- //prefetch data and modify batch state (readLater() modifies this)
+ // prefetch data and modify batch state (readLater() modifies this)
batch.readLater();
}
if (num >= batchSize) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 4c47ba98e848..878c970ae841 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -679,8 +679,7 @@ static ProcessElementMethod create(
* each scoped to a single window.
*/
public boolean observesWindow() {
- return extraParameters()
- .stream()
+ return extraParameters().stream()
.anyMatch(
Predicates.or(
Predicates.instanceOf(WindowParameter.class),
@@ -696,8 +695,7 @@ public boolean observesWindow() {
@Nullable
public RowParameter getRowParameter() {
Optional<Parameter> parameter =
- extraParameters()
- .stream()
+ extraParameters().stream()
.filter(Predicates.instanceOf(RowParameter.class)::apply)
.findFirst();
return parameter.isPresent() ? ((RowParameter) parameter.get()) : null;
@@ -707,8 +705,7 @@ public RowParameter getRowParameter() {
@Nullable
public OutputReceiverParameter getMainOutputReceiver() {
Optional<Parameter> parameter =
- extraParameters()
- .stream()
+ extraParameters().stream()
.filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply)
.findFirst();
return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null;
@@ -718,8 +715,7 @@ public OutputReceiverParameter getMainOutputReceiver() {
* Whether this {@link DoFn} is <a href="https://s.apache.org/splittable-do-fn">splittable</a>.
*/
public boolean isSplittable() {
- return extraParameters()
- .stream()
+ return extraParameters().stream()
.anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 61089df347b9..33e6a7646c9f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -226,8 +226,7 @@ private MethodAnalysisContext() {}
/** Indicates whether a {@link RestrictionTrackerParameter} is known in this context. */
public boolean hasRestrictionTrackerParameter() {
- return extraParameters
- .stream()
+ return extraParameters.stream()
.anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
}
@@ -238,8 +237,7 @@ public boolean hasWindowParameter() {
/** Indicates whether a {@link Parameter.PipelineOptionsParameter} is known in this context. */
public boolean hasPipelineOptionsParamter() {
- return extraParameters
- .stream()
+ return extraParameters.stream()
.anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
index 3101732bba84..ea231c2c0a54 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
@@ -83,8 +83,7 @@ public FilePatternMatchingShardedFile(String filePattern) {
Collection<Metadata> files = FileSystems.match(filePattern).metadata();
LOG.debug(
"Found file(s) {} by matching the path: {}",
- files
- .stream()
+ files.stream()
.map(Metadata::resourceId)
.map(ResourceId::getFilename)
.collect(Collectors.joining(",")),
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index b5e724d5af3f..b359bb532e21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -166,8 +166,7 @@ public static boolean isCancelled(CompletionStage<?> future) {
return blockAndDiscard.thenApply(
nothing ->
- futures
- .stream()
+ futures.stream()
.map(future -> future.toCompletableFuture().join())
.collect(Collectors.toList()));
}
@@ -179,9 +178,8 @@ public static boolean isCancelled(CompletionStage<?> future) {
* #allAsList(Collection)}.
*/
@SuppressWarnings(
- value = "NM_CLASS_NOT_EXCEPTION",
- justification = "The class does hold an exception; its name is accurate."
- )
+ value = "NM_CLASS_NOT_EXCEPTION",
+ justification = "The class does hold an exception; its name is accurate.")
@AutoValue
public abstract static class ExceptionOrResult<T> {
@@ -218,8 +216,7 @@ public static boolean isCancelled(CompletionStage<?> future) {
return blockAndDiscard.thenApply(
nothing ->
- futures
- .stream()
+ futures.stream()
.map(
future -> {
// The limited scope of the exceptions wrapped allows CancellationException
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
index a6f20a0b3b54..3b1805f1d206 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
@@ -145,10 +145,7 @@ private static Row jsonObjectToRow(FieldValue rowFieldValue) {
+ "can be parsed to Beam Rows");
}
- return rowFieldValue
- .rowSchema()
- .getFields()
- .stream()
+ return rowFieldValue.rowSchema().getFields().stream()
.map(
schemaField ->
extractJsonNodeValue(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 65d37c82e54c..4bc8048135cc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -209,12 +209,12 @@ public void testTransientFieldInitialization() throws Exception {
Pojo value = new Pojo("Hello", 42);
AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
- //Serialization of object
+ // Serialization of object
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos);
out.writeObject(coder);
- //De-serialization of object
+ // De-serialization of object
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
ObjectInputStream in = new ObjectInputStream(bis);
AvroCoder<Pojo> copied = (AvroCoder<Pojo>) in.readObject();
@@ -232,11 +232,11 @@ public void testKryoSerialization() throws Exception {
Pojo value = new Pojo("Hello", 42);
AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
- //Kryo instantiation
+ // Kryo instantiation
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
- //Serialization of object without any memoization
+ // Serialization of object without any memoization
ByteArrayOutputStream coderWithoutMemoizationBos = new ByteArrayOutputStream();
try (Output output = new Output(coderWithoutMemoizationBos)) {
kryo.writeObject(output, coder);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 4e0621a6374e..1163b6e89bce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -293,8 +293,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
String[] aElements =
Iterables.toArray(
StreamSupport.stream(
- elements
- .stream()
+ elements.stream()
.filter(
Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
::apply)
@@ -307,8 +306,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
String[] bElements =
Iterables.toArray(
StreamSupport.stream(
- elements
- .stream()
+ elements.stream()
.filter(
Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
::apply)
@@ -321,8 +319,7 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
String[] cElements =
Iterables.toArray(
StreamSupport.stream(
- elements
- .stream()
+ elements.stream()
.filter(
Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
::apply)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
index 131c1779109a..6d26ef570e54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/FieldTypeDescriptorsTest.java
@@ -27,6 +27,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+
/** test for {@link FieldTypeDescriptors}. */
public class FieldTypeDescriptorsTest {
@Test
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
index 5cdf56ddfafe..462fb05d9b19 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
@@ -34,6 +34,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
+
/** Tests for {@link org.apache.beam.sdk.schemas.transforms.Unnest}. */
public class UnnestTest implements Serializable {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@@ -78,16 +79,14 @@ public void testSimpleUnnesting() {
.mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build())
.collect(Collectors.toList());
List<Row> rows =
- bottomRow
- .stream()
+ bottomRow.stream()
.map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
.collect(Collectors.toList());
PCollection<Row> unnested =
pipeline.apply(Create.of(rows).withRowSchema(NESTED_SCHEMA)).apply(Unnest.create());
assertEquals(UNNESTED_SCHEMA, unnested.getSchema());
List<Row> expected =
- bottomRow
- .stream()
+ bottomRow.stream()
.map(
r ->
Row.withSchema(UNNESTED_SCHEMA)
@@ -116,8 +115,7 @@ public void testAlternateNamePolicy() {
.mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i, Integer.toString(i)).build())
.collect(Collectors.toList());
List<Row> rows =
- bottomRow
- .stream()
+ bottomRow.stream()
.map(r -> Row.withSchema(NESTED_SCHEMA2).addValues(r).build())
.collect(Collectors.toList());
PCollection<Row> unnested =
@@ -126,8 +124,7 @@ public void testAlternateNamePolicy() {
.apply(Unnest.<Row>create().withFieldNameFunction(Unnest.KEEP_NESTED_NAME));
assertEquals(UNNESTED2_SCHEMA_ALTERNATE, unnested.getSchema());
List<Row> expected =
- bottomRow
- .stream()
+ bottomRow.stream()
.map(
r ->
Row.withSchema(UNNESTED2_SCHEMA_ALTERNATE)
@@ -148,8 +145,7 @@ public void testClashingNamePolicy() {
.collect(Collectors.toList());
thrown.expect(IllegalArgumentException.class);
List<Row> rows =
- bottomRow
- .stream()
+ bottomRow.stream()
.map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
.collect(Collectors.toList());
PCollection<Row> unnested =
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index e753ba388840..8fd8f859f62e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -63,8 +63,10 @@ public void testFlatMapSimpleFunction() throws Exception {
pipeline
.apply(Create.of(1, 2, 3))
- // Note that FlatMapElements takes an InferableFunction<InputT, ? extends Iterable<OutputT>>
- // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately exercises
+ // Note that FlatMapElements takes an InferableFunction<InputT, ? extends
+ // Iterable<OutputT>>
+ // so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately
+ // exercises
// the use of an upper bound.
.apply(
FlatMapElements.via(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index b1d00e6c5f98..a36010dbcdf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -85,9 +85,7 @@ private void method(
assertTrue(signature.isSplittable());
assertTrue(
- signature
- .extraParameters()
- .stream()
+ signature.extraParameters().stream()
.anyMatch(
Predicates.instanceOf(DoFnSignature.Parameter.RestrictionTrackerParameter.class)
::apply));
@@ -310,7 +308,8 @@ public CoderT getRestrictionCoder() {
DoFnSignature signature =
DoFnSignatures.getSignature(
new GoodGenericSplittableDoFn<
- SomeRestriction, RestrictionTracker<SomeRestriction, ?>,
+ SomeRestriction,
+ RestrictionTracker<SomeRestriction, ?>,
SomeRestrictionCoder>() {}.getClass());
assertEquals(RestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 9de0408efeae..a516b5987865 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -372,7 +372,8 @@ static String getRegionFromZone(String zone) {
Transport.getJsonFactory(),
chainHttpRequestInitializer(
credentials,
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
index 4e0066067dca..0e1b6bde641f 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -96,7 +96,8 @@ private static ApiComponents apiComponentsFromUrl(String urlString) {
getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+ // Do not log the code 404. Code up the stack will deal with 404's if needed,
+ // and
// logging it by default clutters the output during file staging.
new RetryHttpRequestInitializer(
ImmutableList.of(404), new UploadIdResponseInterceptor())))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 51937ad95442..70867508e929 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -51,7 +51,7 @@ public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
- //define the input row format
+ // define the input row format
Schema type =
Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
@@ -59,7 +59,7 @@ public static void main(String[] args) {
Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();
- //create a source PCollection with Create.of();
+ // create a source PCollection with Create.of();
PCollection<Row> inputTable =
PBegin.in(p)
.apply(
@@ -67,7 +67,7 @@ public static void main(String[] args) {
.withSchema(
type, SerializableFunctions.identity(), SerializableFunctions.identity()));
- //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+ // Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
PCollection<Row> outputStream =
inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
index f3327d857347..318d90df15e7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -75,10 +75,7 @@ private JdbcConnection(CalciteConnection connection) throws SQLException {
* to a map of pipeline options.
*/
private static Map<String, String> extractPipelineOptions(CalciteConnection calciteConnection) {
- return calciteConnection
- .getProperties()
- .entrySet()
- .stream()
+ return calciteConnection.getProperties().entrySet().stream()
.map(entry -> KV.of(entry.getKey().toString(), entry.getValue().toString()))
.filter(kv -> kv.getKey().startsWith(PIPELINE_OPTION_PREFIX))
.map(kv -> KV.of(kv.getKey().substring(PIPELINE_OPTION_PREFIX.length()), kv.getValue()))
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
index cbdbbb6aa9c9..70aa89d3d6ee 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
@@ -54,7 +54,7 @@
new FunctionParameter() {
@Override
public int getOrdinal() {
- return 0; //up to one parameter is supported in UDAF.
+ return 0; // up to one parameter is supported in UDAF.
}
@Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 050aeec189d5..419cef17881e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -116,8 +116,7 @@ public RelWriter explainTerms(RelWriter pw) {
public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
Schema outputSchema = CalciteUtils.toSchema(getRowType());
List<FieldAggregation> aggregationAdapters =
- getNamedAggCalls()
- .stream()
+ getNamedAggCalls().stream()
.map(aggCall -> new FieldAggregation(aggCall.getKey(), aggCall.getValue()))
.collect(toList());
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 3a76a615f461..408922be836f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -284,16 +284,16 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
case ARRAY:
return ((List<?>) beamValue)
.stream()
- .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem))
- .collect(Collectors.toList());
+ .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem))
+ .collect(Collectors.toList());
case MAP:
return ((Map<?, ?>) beamValue)
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> entry.getKey(),
- entry -> fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
+ .entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getKey(),
+ entry ->
+ fieldToAvatica(type.getCollectionElementType(), entry.getValue())));
case ROW:
// TODO: needs to be a Struct
return beamValue;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
index 5f060ec5567a..29e53d1a2e6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamRelNode.java
@@ -35,8 +35,7 @@
* @return bounded if and only if all PCollection inputs are bounded
*/
default PCollection.IsBounded isBounded() {
- return getPCollectionInputs()
- .stream()
+ return getPCollectionInputs().stream()
.allMatch(
rel ->
BeamSqlRelUtils.getBeamRelInput(rel).isBounded()
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 760e37bf1ed2..bdc95724240d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -157,7 +157,8 @@ public int getCount() {
PCollection<Row> upstream = pinput.get(0);
// There is a need to separate ORDER BY LIMIT and LIMIT:
- // - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY ... LIMIT
+ // - GroupByKey (used in Top) is not allowed on unbounded data in global window so ORDER BY
+ // ... LIMIT
// works only on bounded data.
// - Just LIMIT operates on unbounded data, but across windows.
if (fieldIndices.isEmpty()) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
index d48a6979e92a..9f0fd5547b32 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
@@ -43,8 +43,7 @@
return PCollectionList.empty(pipeline);
} else {
return PCollectionList.of(
- inputRels
- .stream()
+ inputRels.stream()
.map(input -> BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache))
.collect(Collectors.toList()));
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
index 798f0e6e651c..beb27fc68a1a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamTableUtils.java
@@ -92,8 +92,8 @@ public static String beamRow2CsvLine(Row row, CSVFormat csvFormat) {
/**
* Attempt to cast an object to a specified Schema.Field.Type.
- * @throws IllegalArgumentException if the value cannot be cast to that type.
*
+ * @throws IllegalArgumentException if the value cannot be cast to that type.
* @return The casted object in Schema.Field.Type.
*/
public static Object autoCastField(Schema.Field field, Object rawObj) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
index 735315224616..1b0532c179a5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java
@@ -86,7 +86,8 @@ public void processElement(ProcessContext ctx) {
// Say for Row R, there are m instances on left and n instances on right,
// INTERSECT ALL outputs MIN(m, n) instances of R.
- Iterator<Row> iter = (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator();
+ Iterator<Row> iter =
+ (leftCount <= rightCount) ? leftRows.iterator() : rightRows.iterator();
while (iter.hasNext()) {
ctx.output(iter.next());
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
index e338ea9cd9d5..195ac9604a48 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BeamBuiltinFunctionProvider.java
@@ -27,8 +27,7 @@
public abstract class BeamBuiltinFunctionProvider {
public Map<String, List<Method>> getBuiltinMethods() {
List<Method> methods = Arrays.asList(getClass().getMethods());
- return methods
- .stream()
+ return methods.stream()
.filter(BeamBuiltinFunctionProvider::isUDF)
.collect(
Collectors.groupingBy(method -> method.getDeclaredAnnotation(UDF.class).funcName()));
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
index 0919ea72a549..97d3ffc76d29 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinStringFunctions.java
@@ -35,10 +35,9 @@
// return null for boolean is not allowed.
// TODO: handle null input.
@UDF(
- funcName = "ENDS_WITH",
- parameterArray = {TypeName.STRING},
- returnType = TypeName.STRING
- )
+ funcName = "ENDS_WITH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING)
public Boolean endsWith(String str1, String str2) {
return str1.endsWith(str2);
}
@@ -47,19 +46,17 @@ public Boolean endsWith(String str1, String str2) {
// return null for boolean is not allowed.
// TODO: handle null input.
@UDF(
- funcName = "STARTS_WITH",
- parameterArray = {TypeName.STRING},
- returnType = TypeName.STRING
- )
+ funcName = "STARTS_WITH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING)
public Boolean startsWith(String str1, String str2) {
return str1.startsWith(str2);
}
@UDF(
- funcName = "LENGTH",
- parameterArray = {TypeName.STRING},
- returnType = TypeName.INT64
- )
+ funcName = "LENGTH",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.INT64)
public Long lengthString(String str) {
if (str == null) {
return null;
@@ -68,10 +65,9 @@ public Long lengthString(String str) {
}
@UDF(
- funcName = "LENGTH",
- parameterArray = {TypeName.BYTES},
- returnType = TypeName.INT64
- )
+ funcName = "LENGTH",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.INT64)
public Long lengthBytes(byte[] bytes) {
if (bytes == null) {
return null;
@@ -80,10 +76,9 @@ public Long lengthBytes(byte[] bytes) {
}
@UDF(
- funcName = "REVERSE",
- parameterArray = {TypeName.STRING},
- returnType = TypeName.STRING
- )
+ funcName = "REVERSE",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.STRING)
public String reverseString(String str) {
if (str == null) {
return null;
@@ -92,10 +87,9 @@ public String reverseString(String str) {
}
@UDF(
- funcName = "REVERSE",
- parameterArray = {TypeName.BYTES},
- returnType = TypeName.BYTES
- )
+ funcName = "REVERSE",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.BYTES)
public byte[] reverseBytes(byte[] bytes) {
if (bytes == null) {
return null;
@@ -106,10 +100,9 @@ public String reverseString(String str) {
}
@UDF(
- funcName = "FROM_HEX",
- parameterArray = {TypeName.STRING},
- returnType = TypeName.BYTES
- )
+ funcName = "FROM_HEX",
+ parameterArray = {TypeName.STRING},
+ returnType = TypeName.BYTES)
public byte[] fromHex(String str) {
if (str == null) {
return null;
@@ -123,10 +116,9 @@ public String reverseString(String str) {
}
@UDF(
- funcName = "TO_HEX",
- parameterArray = {TypeName.BYTES},
- returnType = TypeName.STRING
- )
+ funcName = "TO_HEX",
+ parameterArray = {TypeName.BYTES},
+ returnType = TypeName.STRING)
public String toHex(byte[] bytes) {
if (bytes == null) {
return null;
@@ -136,19 +128,17 @@ public String toHex(byte[] bytes) {
}
@UDF(
- funcName = "LPAD",
- parameterArray = {TypeName.STRING, TypeName.INT64},
- returnType = TypeName.STRING
- )
+ funcName = "LPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64},
+ returnType = TypeName.STRING)
public String lpad(String originalValue, Long returnLength) {
return lpad(originalValue, returnLength, " ");
}
@UDF(
- funcName = "LPAD",
- parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
- returnType = TypeName.STRING
- )
+ funcName = "LPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+ returnType = TypeName.STRING)
public String lpad(String originalValue, Long returnLength, String pattern) {
if (originalValue == null || returnLength == null || pattern == null) {
return null;
@@ -169,19 +159,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) {
}
@UDF(
- funcName = "LPAD",
- parameterArray = {TypeName.BYTES, TypeName.INT64},
- returnType = TypeName.BYTES
- )
+ funcName = "LPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64},
+ returnType = TypeName.BYTES)
public byte[] lpad(byte[] originalValue, Long returnLength) {
return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
}
@UDF(
- funcName = "LPAD",
- parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
- returnType = TypeName.BYTES
- )
+ funcName = "LPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+ returnType = TypeName.BYTES)
public byte[] lpad(byte[] originalValue, Long returnLength, byte[] pattern) {
if (originalValue == null || returnLength == null || pattern == null) {
return null;
@@ -214,19 +202,17 @@ public String lpad(String originalValue, Long returnLength, String pattern) {
}
@UDF(
- funcName = "RPAD",
- parameterArray = {TypeName.STRING, TypeName.INT64},
- returnType = TypeName.STRING
- )
+ funcName = "RPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64},
+ returnType = TypeName.STRING)
public String rpad(String originalValue, Long returnLength) {
return lpad(originalValue, returnLength, " ");
}
@UDF(
- funcName = "RPAD",
- parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
- returnType = TypeName.STRING
- )
+ funcName = "RPAD",
+ parameterArray = {TypeName.STRING, TypeName.INT64, TypeName.STRING},
+ returnType = TypeName.STRING)
public String rpad(String originalValue, Long returnLength, String pattern) {
if (originalValue == null || returnLength == null || pattern == null) {
return null;
@@ -247,19 +233,17 @@ public String rpad(String originalValue, Long returnLength, String pattern) {
}
@UDF(
- funcName = "RPAD",
- parameterArray = {TypeName.BYTES, TypeName.INT64},
- returnType = TypeName.BYTES
- )
+ funcName = "RPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64},
+ returnType = TypeName.BYTES)
public byte[] rpad(byte[] originalValue, Long returnLength) {
return lpad(originalValue, returnLength, " ".getBytes(UTF_8));
}
@UDF(
- funcName = "RPAD",
- parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
- returnType = TypeName.BYTES
- )
+ funcName = "RPAD",
+ parameterArray = {TypeName.BYTES, TypeName.INT64, TypeName.BYTES},
+ returnType = TypeName.BYTES)
public byte[] rpad(byte[] originalValue, Long returnLength, byte[] pattern) {
if (originalValue == null || returnLength == null || pattern == null) {
return null;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
index 9049284d4db7..9a5f8ab1a550 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/BuiltinTrigonometricFunctions.java
@@ -31,10 +31,9 @@
*/
// TODO: handle overflow
@UDF(
- funcName = "COSH",
- parameterArray = {Schema.TypeName.DOUBLE},
- returnType = Schema.TypeName.DOUBLE
- )
+ funcName = "COSH",
+ parameterArray = {Schema.TypeName.DOUBLE},
+ returnType = Schema.TypeName.DOUBLE)
public Double cosh(Double o) {
if (o == null) {
return null;
@@ -49,10 +48,9 @@ public Double cosh(Double o) {
*/
// TODO: handle overflow
@UDF(
- funcName = "SINH",
- parameterArray = {Schema.TypeName.DOUBLE},
- returnType = Schema.TypeName.DOUBLE
- )
+ funcName = "SINH",
+ parameterArray = {Schema.TypeName.DOUBLE},
+ returnType = Schema.TypeName.DOUBLE)
public Double sinh(Double o) {
if (o == null) {
return null;
@@ -66,10 +64,9 @@ public Double sinh(Double o) {
* <p>Computes hyperbolic tangent of X. Does not fail.
*/
@UDF(
- funcName = "TANH",
- parameterArray = {Schema.TypeName.DOUBLE},
- returnType = Schema.TypeName.DOUBLE
- )
+ funcName = "TANH",
+ parameterArray = {Schema.TypeName.DOUBLE},
+ returnType = Schema.TypeName.DOUBLE)
public Double tanh(Double o) {
if (o == null) {
return null;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
index 9e13cc85630f..c441303d28ef 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsInf.java
@@ -33,19 +33,17 @@
private static final String SQL_FUNCTION_NAME = "IS_INF";
@UDF(
- funcName = SQL_FUNCTION_NAME,
- parameterArray = {Schema.TypeName.DOUBLE},
- returnType = Schema.TypeName.BOOLEAN
- )
+ funcName = SQL_FUNCTION_NAME,
+ parameterArray = {Schema.TypeName.DOUBLE},
+ returnType = Schema.TypeName.BOOLEAN)
public Boolean isInf(Double value) {
return Double.isInfinite(value);
}
@UDF(
- funcName = SQL_FUNCTION_NAME,
- parameterArray = {Schema.TypeName.FLOAT},
- returnType = Schema.TypeName.BOOLEAN
- )
+ funcName = SQL_FUNCTION_NAME,
+ parameterArray = {Schema.TypeName.FLOAT},
+ returnType = Schema.TypeName.BOOLEAN)
public Boolean isInf(Float value) {
return Float.isInfinite(value);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
index 6bc1d31a2a68..f2bee2c2ccae 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udf/IsNan.java
@@ -32,19 +32,17 @@
private static final String SQL_FUNCTION_NAME = "IS_NAN";
@UDF(
- funcName = SQL_FUNCTION_NAME,
- parameterArray = {Schema.TypeName.FLOAT},
- returnType = Schema.TypeName.BOOLEAN
- )
+ funcName = SQL_FUNCTION_NAME,
+ parameterArray = {Schema.TypeName.FLOAT},
+ returnType = Schema.TypeName.BOOLEAN)
public Boolean isNan(Float value) {
return Float.isNaN(value);
}
@UDF(
- funcName = SQL_FUNCTION_NAME,
- parameterArray = {Schema.TypeName.DOUBLE},
- returnType = Schema.TypeName.BOOLEAN
- )
+ funcName = SQL_FUNCTION_NAME,
+ parameterArray = {Schema.TypeName.DOUBLE},
+ returnType = Schema.TypeName.BOOLEAN)
public Boolean isNan(Double value) {
return Double.isNaN(value);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index b227f88967c7..cbc23f0dd013 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -202,7 +202,7 @@ private static RelDataType toRelDataType(
* @return
*/
public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Type rawType) {
- //For Joda time types, return SQL type for java.util.Date.
+ // For Joda time types, return SQL type for java.util.Date.
if (rawType instanceof Class && AbstractInstant.class.isAssignableFrom((Class<?>) rawType)) {
return typeFactory.createJavaType(Date.class);
} else if (rawType instanceof Class && ByteString.class.isAssignableFrom((Class<?>) rawType)) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
index 1bb8dee3ba43..d7703808fc6a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
@@ -95,9 +95,7 @@ public void processElement(ProcessContext context) {
* payload, and attributes.
*/
private List<Object> getFieldValues(ProcessContext context) {
- return messageSchema()
- .getFields()
- .stream()
+ return messageSchema().getFields().stream()
.map(field -> getValueForField(field, context.timestamp(), context.element()))
.collect(toList());
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 146f5cf8fa8c..3689ba831c50 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -82,9 +82,7 @@ public void dropTable(String tableName) {
@Override
public Map<String, Table> getTables() {
- return tables()
- .entrySet()
- .stream()
+ return tables().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table));
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
index 9ba719332956..d1336002aad5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
@@ -75,8 +75,7 @@ public static Schema buildBeamSqlSchema(Object... args) {
* }</pre>
*/
public static List<Row> buildRows(Schema type, List<?> rowsValues) {
- return Lists.partition(rowsValues, type.getFieldCount())
- .stream()
+ return Lists.partition(rowsValues, type.getFieldCount()).stream()
.map(values -> values.stream().collect(toRow(type)))
.collect(toList());
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 3c462952e6ad..28f3f752443d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -60,14 +60,14 @@
static List<Row> rowsOfBytes;
static List<Row> rowsOfBytesPaddingTest;
- //bounded PCollections
+ // bounded PCollections
protected PCollection<Row> boundedInput1;
protected PCollection<Row> boundedInput2;
protected PCollection<Row> boundedInputFloatDouble;
protected PCollection<Row> boundedInputBytes;
protected PCollection<Row> boundedInputBytesPaddingTest;
- //unbounded PCollections
+ // unbounded PCollections
protected PCollection<Row> unboundedInput1;
protected PCollection<Row> unboundedInput2;
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
index 04ae5f17e6b8..f69f3b985a4b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java
@@ -164,9 +164,7 @@ private static SqlOperatorId sqlOperatorId(SqlOperator sqlOperator) {
Set<SqlOperatorId> declaredOperators = new HashSet<>();
declaredOperators.addAll(
- SqlStdOperatorTable.instance()
- .getOperatorList()
- .stream()
+ SqlStdOperatorTable.instance().getOperatorList().stream()
.map(operator -> sqlOperatorId(operator.getName(), operator.getKind()))
.collect(Collectors.toList()));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 7582aa4d8f16..a7525d2c61ef 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -197,8 +197,7 @@ public void testSelectsFromExistingTable() throws Exception {
connection.createStatement().executeQuery("SELECT id, name FROM person");
List<Row> resultRows =
- readResultSet(selectResult)
- .stream()
+ readResultSet(selectResult).stream()
.map(values -> values.stream().collect(toRow(BASIC_SCHEMA)))
.collect(Collectors.toList());
@@ -322,8 +321,7 @@ public void testSelectsFromExistingComplexTable() throws Exception {
.executeQuery("SELECT person.nestedRow.id, person.nestedRow.name FROM person");
List<Row> resultRows =
- readResultSet(selectResult)
- .stream()
+ readResultSet(selectResult).stream()
.map(values -> values.stream().collect(toRow(BASIC_SCHEMA)))
.collect(Collectors.toList());
@@ -350,8 +348,7 @@ public void testInsertIntoCreatedTable() throws Exception {
connection.createStatement().executeQuery("SELECT id, name FROM person");
List<Row> resultRows =
- readResultSet(selectResult)
- .stream()
+ readResultSet(selectResult).stream()
.map(resultValues -> resultValues.stream().collect(toRow(BASIC_SCHEMA)))
.collect(Collectors.toList());
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
index acc7633fa4f6..39b5f0df014b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLNestedTypesTest.java
@@ -119,10 +119,7 @@ private String unparseMap(FieldType fieldType) {
private String unparseRow(FieldType fieldType) {
return "ROW<"
- + fieldType
- .getRowSchema()
- .getFields()
- .stream()
+ + fieldType.getRowSchema().getFields().stream()
.map(field -> field.getName() + " " + unparse(field.getType()))
.collect(joining(","))
+ ">";
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
index 7eea6ece2896..02349a558c43 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
@@ -45,9 +45,7 @@ public void setUp() {
Map<String, RelDataType> calciteRowTypeFields(Schema schema) {
final RelDataType dataType = CalciteUtils.toCalciteRowType(schema, dataTypeFactory);
- return dataType
- .getFieldNames()
- .stream()
+ return dataType.getFieldNames().stream()
.collect(
Collectors.toMap(
x -> x,
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index 062700ba0fe6..e8fa1358600d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -340,9 +340,8 @@ private CalciteConnection connect(PipelineOptions options, TableProvider... tabl
// The actual options are in the "options" field of the converted map
Map<String, String> argsMap =
((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
- .entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
+ .entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index ef5c40eaf7b9..28fc4a406c46 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -71,7 +71,9 @@
static class Factory<InputT, RestrictionT, OutputT>
extends DoFnPTransformRunnerFactory<
- KV<InputT, RestrictionT>, InputT, OutputT,
+ KV<InputT, RestrictionT>,
+ InputT,
+ OutputT,
SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>> {
@Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
index d52ad469ef01..32797fa9da3e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
@@ -246,7 +246,7 @@ public void testBasicInboundConsumerBehaviour() throws Exception {
@Test(timeout = 10000)
public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() throws Exception {
CountDownLatch waitForClientToConnect = new CountDownLatch(1);
- //Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
+ // Collection<WindowedValue<String>> inboundValuesA = new ConcurrentLinkedQueue<>();
Collection<WindowedValue<String>> inboundValuesB = new ConcurrentLinkedQueue<>();
Collection<BeamFnApi.Elements> inboundServerValues = new ConcurrentLinkedQueue<>();
AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver =
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 908e31c77f36..a1ac65ecec4a 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -241,10 +241,9 @@ public SSEAwsKeyManagementParams deserialize(JsonParser parser, DeserializationC
}
@JsonAutoDetect(
- fieldVisibility = Visibility.NONE,
- getterVisibility = Visibility.NONE,
- setterVisibility = Visibility.NONE
- )
+ fieldVisibility = Visibility.NONE,
+ getterVisibility = Visibility.NONE,
+ setterVisibility = Visibility.NONE)
interface ClientConfigurationMixin {
@JsonProperty
String getProxyHost();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 50a10a357a3c..1276a79c6ddf 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -596,8 +596,7 @@ protected void rename(
@Override
protected void delete(Collection<S3ResourceId> resourceIds) throws IOException {
List<S3ResourceId> nonDirectoryPaths =
- resourceIds
- .stream()
+ resourceIds.stream()
.filter(s3ResourceId -> !s3ResourceId.isDirectory())
.collect(Collectors.toList());
Multimap<String, String> keysByBucket = ArrayListMultimap.create();
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
index df706082d7db..449dd1e3af16 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/sns/SnsIO.java
@@ -82,7 +82,7 @@
@Experimental(Experimental.Kind.SOURCE_SINK)
public final class SnsIO {
- //Write data tp SNS
+ // Write data tp SNS
public static Write write() {
return new AutoValue_SnsIO_Write.Builder().build();
}
@@ -283,7 +283,7 @@ public PCollectionTuple expand(PCollection<PublishRequest> input) {
@Setup
public void setup() throws Exception {
- //Initialize SnsPublisher
+ // Initialize SnsPublisher
producer = spec.getAWSClientsProvider().createSnsPublisher();
checkArgument(
topicExists(producer, spec.getTopicName()),
diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index db637a4a2afe..46640e39ab96 100644
--- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -696,19 +696,19 @@ public void testWriteAndRead() throws IOException {
ByteBuffer bb = ByteBuffer.allocate(writtenArray.length);
bb.put(writtenArray);
- //First create an object and write data to it
+ // First create an object and write data to it
S3ResourceId path = S3ResourceId.fromUri("s3://testbucket/foo/bar.txt");
WritableByteChannel writableByteChannel =
s3FileSystem.create(path, builder().setMimeType("application/text").build());
writableByteChannel.write(bb);
writableByteChannel.close();
- //Now read the same object
+ // Now read the same object
ByteBuffer bb2 = ByteBuffer.allocate(writtenArray.length);
ReadableByteChannel open = s3FileSystem.open(path);
open.read(bb2);
- //And compare the content with the one that was written
+ // And compare the content with the one that was written
byte[] readArray = bb2.array();
assertArrayEquals(readArray, writtenArray);
open.close();
diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
index e648112820a1..ee31292fe4a9 100644
--- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
+++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.java
@@ -214,21 +214,14 @@ static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
SplitGenerator splitGenerator = new SplitGenerator(cluster.getMetadata().getPartitioner());
List<BigInteger> tokens =
- cluster
- .getMetadata()
- .getTokenRanges()
- .stream()
+ cluster.getMetadata().getTokenRanges().stream()
.map(tokenRange -> new BigInteger(tokenRange.getEnd().getValue().toString()))
.collect(Collectors.toList());
List<List<RingRange>> splits = splitGenerator.generateSplits(numSplits, tokens);
LOG.info("{} splits were actually generated", splits.size());
final String partitionKey =
- cluster
- .getMetadata()
- .getKeyspace(spec.keyspace())
- .getTable(spec.table())
- .getPartitionKey()
+ cluster.getMetadata().getKeyspace(spec.keyspace()).getTable(spec.table()).getPartitionKey()
.stream()
.map(ColumnMetadata::getName)
.collect(Collectors.joining(","));
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 0eaca4f58121..da5e54dd0389 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -366,9 +366,7 @@ private static void set(Properties properties, String param, Object value) {
@VisibleForTesting
static String insertSql(TableSchema schema, String table) {
String columnsStr =
- schema
- .columns()
- .stream()
+ schema.columns().stream()
.filter(x -> !x.materializedOrAlias())
.map(x -> quoteIdentifier(x.name()))
.collect(Collectors.joining(", "));
diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index b8b1e3bbb651..75692d2859b4 100644
--- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -45,9 +45,7 @@ public static TableSchema of(Column... columns) {
* @return Beam schema
*/
public static Schema getEquivalentSchema(TableSchema tableSchema) {
- return tableSchema
- .columns()
- .stream()
+ return tableSchema.columns().stream()
.map(
x -> {
if (x.columnType().nullable()) {
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 55788b007bca..9a6da467928b 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -53,7 +53,7 @@
private static Node node;
private static RestClient restClient;
private static ConnectionConfiguration connectionConfiguration;
- //cannot use inheritance because ES5 test already extends ESIntegTestCase.
+ // cannot use inheritance because ES5 test already extends ESIntegTestCase.
private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 24748b9e4637..806b93e307d0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
- //useful to have updated sizes for getEstimatedSize
+ // useful to have updated sizes for getEstimatedSize
.put("index.store.stats_refresh_interval", 0)
.build();
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index d731f339424a..0579cfa115c0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -73,7 +73,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
- //useful to have updated sizes for getEstimatedSize
+ // useful to have updated sizes for getEstimatedSize
.put("index.store.stats_refresh_interval", 0)
.build();
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 579c968adf46..129619eb8521 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -185,9 +185,9 @@ void testRead() throws Exception {
pipeline.apply(
ElasticsearchIO.read()
.withConnectionConfiguration(connectionConfiguration)
- //set to default value, useful just to test parameter passing.
+ // set to default value, useful just to test parameter passing.
.withScrollKeepalive("5m")
- //set to default value, useful just to test parameter passing.
+ // set to default value, useful just to test parameter passing.
.withBatchSize(100L));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(numDocs);
pipeline.run();
@@ -594,7 +594,8 @@ void testDefaultRetryPredicate(RestClient restClient) throws IOException {
*/
void testWriteRetry() throws Throwable {
expectedException.expectCause(isA(IOException.class));
- // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and retry started.
+ // max attempt is 3, but retry is 2 which excludes 1st attempt when error was identified and
+ // retry started.
expectedException.expectMessage(
String.format(ElasticsearchIO.Write.WriteFn.RETRY_FAILED_LOG, EXPECTED_RETRIES));
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 7c30563709ee..9956684c269b 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -188,7 +188,7 @@ static void checkForErrors(HttpEntity responseEntity, int backendVersion) throws
StringBuilder errorMessages =
new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
JsonNode items = searchResult.path("items");
- //some items present in bulk might have errors, concatenate error messages
+ // some items present in bulk might have errors, concatenate error messages
for (JsonNode item : items) {
String errorRootName = "";
@@ -572,7 +572,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
@Nullable private final Integer numSlices;
@Nullable private final Integer sliceId;
- //constructor used in split() when we know the backend version
+ // constructor used in split() when we know the backend version
private BoundedElasticsearchSource(
Read spec,
@Nullable String shardPreference,
@@ -727,7 +727,7 @@ public boolean start() throws IOException {
if ((source.backendVersion == 5 || source.backendVersion == 6)
&& source.numSlices != null
&& source.numSlices > 1) {
- //if there is more than one slice, add the slice to the user query
+ // if there is more than one slice, add the slice to the user query
String sliceQuery =
String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices);
query = query.replaceFirst("\\{", "{" + sliceQuery + ",");
@@ -777,7 +777,7 @@ public boolean advance() throws IOException {
}
private boolean readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
- //stop if no more data
+ // stop if no more data
JsonNode hits = searchResult.path("hits").path("hits");
if (hits.size() == 0) {
current = null;
@@ -1300,12 +1300,12 @@ private HttpEntity handleRetry(
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackoff.backoff();
int attempt = 0;
- //while retry policy exists
+ // while retry policy exists
while (BackOffUtils.next(sleeper, backoff)) {
LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
response = restClient.performRequest(method, endpoint, params, requestBody);
responseEntity = new BufferedHttpEntity(response.getEntity());
- //if response has no 429 errors
+ // if response has no 429 errors
if (!spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
return responseEntity;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b4dba1b78bb9..54378fcf3636 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1604,8 +1604,7 @@ public WriteResult expand(PCollection<T> input) {
checkArgument(
1
== Iterables.size(
- allToArgs
- .stream()
+ allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of jsonTableRef, tableFunction, or " + "dynamicDestinations must be set");
@@ -1615,8 +1614,7 @@ public WriteResult expand(PCollection<T> input) {
checkArgument(
2
> Iterables.size(
- allSchemaArgs
- .stream()
+ allSchemaArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may " + "be set");
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 00d5b570b824..a30465e61b9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -914,7 +914,8 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Inte
Transport.getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
httpRequestInitializer))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index e8fcf210a20b..b6d8b748c16e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -263,9 +263,7 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
.collect(toMap(i -> bqFields.get(i).getName(), i -> i));
List<Object> rawJsonValues =
- rowSchema
- .getFields()
- .stream()
+ rowSchema.getFields().stream()
.map(field -> bqFieldIndices.get(field.getName()))
.map(index -> jsonBqRow.getF().get(index).getV())
.collect(toList());
@@ -284,9 +282,9 @@ private static Object toBeamValue(FieldType fieldType, Object jsonBQValue) {
if (jsonBQValue instanceof List) {
return ((List<Object>) jsonBQValue)
.stream()
- .map(v -> ((Map<String, Object>) v).get("v"))
- .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
- .collect(toList());
+ .map(v -> ((Map<String, Object>) v).get("v"))
+ .map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
+ .collect(toList());
}
throw new UnsupportedOperationException(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
index d96e8b3d0810..ed10c21ccb9d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
@@ -228,8 +228,7 @@ private void pollAndAssert(
return Collections.emptyList();
}
- return bqRows
- .stream()
+ return bqRows.stream()
.map(bqRow -> toBeamRow(rowSchema, bqSchema, bqRow))
.collect(Collectors.toList());
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 633355d8b24d..9277055ace31 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -75,7 +75,8 @@ public PubsubClient newClient(
Transport.getJsonFactory(),
chainHttpRequestInitializer(
options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
+ // Do not log 404. It clutters the output and is possibly even required by the
+ // caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setRootUrl(options.getPubsubRootUrl())
.setApplicationName(options.getAppName())
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index ead5cb555b14..d09745dcdfdd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -672,7 +672,8 @@ public void testInsertOtherRetry() throws Throwable {
List<ValueInSingleWindow<TableRow>> rows = new ArrayList<>();
rows.add(wrapValue(new TableRow()));
- // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload but should not
+ // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload
+ // but should not
// be invoked.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
index d976366ac4b0..8d6dcff19d04 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryToTableIT.java
@@ -149,9 +149,7 @@ private void verifyLegacyQueryRes(String outputTable) throws Exception {
BQ_CLIENT.queryWithRetries(String.format("SELECT fruit from [%s];", outputTable), project);
LOG.info("Finished to query result table {}", outputTable);
List<String> tableResult =
- response
- .getRows()
- .stream()
+ response.getRows().stream()
.flatMap(row -> row.getF().stream().map(cell -> cell.getV().toString()))
.sorted()
.collect(Collectors.toList());
@@ -171,9 +169,7 @@ private void verifyNewTypesQueryRes(String outputTable) throws Exception {
String.format("SELECT bytes, date, time FROM [%s];", outputTable), project);
LOG.info("Finished to query result table {}", outputTable);
List<String> tableResult =
- response
- .getRows()
- .stream()
+ response.getRows().stream()
.map(
row -> {
String res = "";
@@ -234,13 +230,13 @@ private void verifyStandardQueryRes(String outputTable) throws Exception {
@BeforeClass
public static void setupTestEnvironment() throws Exception {
PipelineOptionsFactory.register(BigQueryToTableOptions.class);
- project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
// Create one BQ dataset for all test cases.
BQ_CLIENT.createNewDataset(project, BIG_QUERY_DATASET_ID);
// Create table and insert data for new type query test cases.
BQ_CLIENT.createNewTable(
- project,
+ project,
BIG_QUERY_DATASET_ID,
new Table()
.setSchema(BigQueryToTableIT.NEW_TYPES_QUERY_TABLE_SCHEMA)
@@ -265,7 +261,7 @@ public static void cleanup() {
@Test
public void testLegacyQueryWithoutReshuffle() throws Exception {
final String outputTable =
- project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle";
+ project + ":" + BIG_QUERY_DATASET_ID + "." + "testLegacyQueryWithoutReshuffle";
this.runBigQueryToTablePipeline(setupLegacyQueryTest(outputTable));
@@ -275,7 +271,7 @@ public void testLegacyQueryWithoutReshuffle() throws Exception {
@Test
public void testNewTypesQueryWithoutReshuffle() throws Exception {
final String outputTable =
- project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle";
+ project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffle";
this.runBigQueryToTablePipeline(setupNewTypesQueryTest(outputTable));
@@ -285,7 +281,7 @@ public void testNewTypesQueryWithoutReshuffle() throws Exception {
@Test
public void testNewTypesQueryWithReshuffle() throws Exception {
final String outputTable =
- project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle";
+ project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithReshuffle";
BigQueryToTableOptions options = setupNewTypesQueryTest(outputTable);
options.setReshuffle(true);
@@ -297,7 +293,7 @@ public void testNewTypesQueryWithReshuffle() throws Exception {
@Test
public void testStandardQueryWithoutCustom() throws Exception {
final String outputTable =
- project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom";
+ project + ":" + BIG_QUERY_DATASET_ID + "." + "testStandardQueryWithoutCustom";
this.runBigQueryToTablePipeline(setupStandardQueryTest(outputTable));
@@ -308,7 +304,7 @@ public void testStandardQueryWithoutCustom() throws Exception {
@Category(DataflowPortabilityApiUnsupported.class)
public void testNewTypesQueryWithoutReshuffleWithCustom() throws Exception {
final String outputTable =
- project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom";
+ project + ":" + BIG_QUERY_DATASET_ID + "." + "testNewTypesQueryWithoutReshuffleWithCustom";
BigQueryToTableOptions options = this.setupNewTypesQueryTest(outputTable);
options.setExperiments(
ImmutableList.of("enable_custom_bigquery_sink", "enable_custom_bigquery_source"));
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 8924733795fc..e995e7af969e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -581,8 +581,7 @@ public void testReadingWithFilter() throws Exception {
String regex = ".*17.*";
final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
Iterable<Row> filteredRows =
- testRows
- .stream()
+ testRows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
@@ -710,7 +709,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
- //Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9]
+ // Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9]
List<ByteKeyRange> keyRanges =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
@@ -720,7 +719,7 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
ByteKeyRange.of(createByteKey(6), createByteKey(7)),
ByteKeyRange.of(createByteKey(8), createByteKey(9)));
- //Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9]
+ // Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9]
List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(2)),
@@ -770,7 +769,7 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception {
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
- //Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
+ // Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
List<ByteKeyRange> keyRanges =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
@@ -846,8 +845,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception {
splits.add(source.withSingleRange(range));
}
- //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
- //expected reduced Split source ranges are [..4][4..8][8..]
+ // Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
+ // expected reduced Split source ranges are [..4][4..8][8..]
List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(4)),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index ed885555bd1f..a3ed49deeb77 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -233,7 +233,7 @@ static long countEntities(V1TestOptions options, String project, String ancestor
private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
// Number of times to retry on update failure
private static final int MAX_RETRIES = 5;
- //Initial backoff time for exponential backoff for retry attempts.
+ // Initial backoff time for exponential backoff for retry attempts.
private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5);
// Returns true if a Datastore key is complete. A key is complete if its last element
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 13de3b51355c..6fe51d6a95d7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -354,8 +354,7 @@ public void evaluate() throws Throwable {
private <T> void setupTestClient(List<T> inputs, Coder<T> coder) {
List<IncomingMessage> messages =
- inputs
- .stream()
+ inputs.stream()
.map(
t -> {
try {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 3b92f140cf0a..5b4a7262a8b3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -530,9 +530,7 @@ public void testGatherBundleAndSortFn() throws Exception {
// Verify sorted output... first decode it...
List<MutationGroup> sorted =
- byteArrayKvListCaptor
- .getValue()
- .stream()
+ byteArrayKvListCaptor.getValue().stream()
.map(kv -> WriteGrouped.decode(kv.getValue()))
.collect(Collectors.toList());
assertThat(
@@ -586,8 +584,7 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception {
// decode list of lists of KV to a list of lists of MutationGroup.
List<List<MutationGroup>> mgListGroups =
- kvGroups
- .stream()
+ kvGroups.stream()
.map(
l ->
l.stream()
@@ -625,8 +622,7 @@ public void testBatchFn_cells() throws Exception {
g(m(2L)));
List<KV<byte[], byte[]>> encodedInput =
- mutationGroups
- .stream()
+ mutationGroups.stream()
.map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg)))
.collect(Collectors.toList());
@@ -671,8 +667,7 @@ public void testBatchFn_size() throws Exception {
g(m(2L)));
List<KV<byte[], byte[]>> encodedInput =
- mutationGroups
- .stream()
+ mutationGroups.stream()
.map(mg -> KV.of((byte[]) null, WriteGrouped.encode(mg)))
.collect(Collectors.toList());
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index a85cfd08b730..0bd556f092f8 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -190,7 +190,8 @@ protected void rename(
boolean success = fileSystem.rename(src, dest);
// If the failure was due to the file already existing, delete and retry (BEAM-5036).
- // This should be the exceptional case, so handle here rather than incur the overhead of testing first
+ // This should be the exceptional case, so handle here rather than incur the overhead of
+ // testing first
if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) {
LOG.debug(
String.format(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(dest)));
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
index e5a7bf37751a..5a73ea12416c 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HDFSSynchronization.java
@@ -152,7 +152,7 @@ private boolean tryCreateFile(Configuration conf, Path path) {
} catch (FileAlreadyExistsException | org.apache.hadoop.fs.FileAlreadyExistsException e) {
return false;
} catch (RemoteException e) {
- //remote hdfs exception
+ // remote hdfs exception
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())) {
return false;
}
diff --git a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
index 5cce85abd535..1d7fc32c203f 100644
--- a/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
+++ b/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
@@ -641,8 +641,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
"Generated {} splits. Size of first split is {} ",
inputSplits.size(),
inputSplits.get(0).getSplit().getLength());
- return inputSplits
- .stream()
+ return inputSplits.stream()
.map(
serializableInputSplit -> {
return new HadoopInputFormatBoundedSource<>(
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
index 04eadeaaf468..4b88e32ece6b 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOCassandraTest.java
@@ -177,7 +177,7 @@ private static void createCassandraData() {
@BeforeClass
public static void startCassandra() throws Exception {
- //Start the Embedded Cassandra Service
+ // Start the Embedded Cassandra Service
cassandra.start();
final SocketOptions socketOptions = new SocketOptions();
// Setting this to 0 disables read timeouts.
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
index 1a19dd937a5d..509cc46c8980 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOSequenceFileTest.java
@@ -93,8 +93,7 @@
KV.of(new Text(element.getKey()), new LongWritable(element.getValue()));
private static Map<String, Long> computeWordCounts(List<String> sentences) {
- return sentences
- .stream()
+ return sentences.stream()
.flatMap(s -> Stream.of(s.split("\\W+")))
.map(String::toLowerCase)
.collect(Collectors.toMap(Function.identity(), s -> 1L, Long::sum));
@@ -298,8 +297,7 @@ public void streamTest() {
}
private Map<String, Long> loadWrittenDataAsMap(String outputDirPath) {
- return loadWrittenData(outputDirPath)
- .stream()
+ return loadWrittenData(outputDirPath).stream()
.collect(
Collectors.toMap(
kv -> kv.getKey().toString(),
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
index a40ab46e8871..c938b7df7343 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOTestOptions.java
@@ -24,7 +24,7 @@
/** Properties needed when using HadoopFormatIO with the Beam SDK. */
public interface HadoopFormatIOTestOptions extends TestPipelineOptions {
- //Cassandra test options
+ // Cassandra test options
@Description("Cassandra Server IP")
@Default.String("cassandraServerIp")
String getCassandraServerIp();
@@ -49,7 +49,7 @@
void setCassandraPassword(String cassandraPassword);
- //Elasticsearch test options
+ // Elasticsearch test options
@Description("Elasticsearch Server IP")
@Default.String("elasticServerIp")
String getElasticServerIp();
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
index 9d62f481add8..98db521b5324 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/TestEmployeeDataSet.java
@@ -68,11 +68,12 @@
public static List<KV<Text, Employee>> getEmployeeData() {
return (data.isEmpty() ? populateEmployeeData() : data)
.stream()
- .map(
- input -> {
- List<String> empData = Splitter.on('_').splitToList(input.getValue());
- return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
- })
- .collect(Collectors.toList());
+ .map(
+ input -> {
+ List<String> empData = Splitter.on('_').splitToList(input.getValue());
+ return KV.of(
+ new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
+ })
+ .collect(Collectors.toList());
}
}
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
index f2a1eae936d4..02732611b7d7 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java
@@ -175,7 +175,7 @@ private static void createCassandraData() {
@BeforeClass
public static void startCassandra() throws Exception {
- //Start the Embedded Cassandra Service
+ // Start the Embedded Cassandra Service
cassandra.start();
final SocketOptions socketOptions = new SocketOptions();
// Setting this to 0 disables read timeouts.
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
index 7c571b10df67..ade544fc22bd 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFITestOptions.java
@@ -24,7 +24,7 @@
/** Properties needed when using HadoopInputFormatIO with the Beam SDK. */
public interface HIFITestOptions extends TestPipelineOptions {
- //Cassandra test options
+ // Cassandra test options
@Description("Cassandra Server IP")
@Default.String("cassandraServerIp")
String getCassandraServerIp();
@@ -49,7 +49,7 @@
void setCassandraPassword(String cassandraPassword);
- //Elasticsearch test options
+ // Elasticsearch test options
@Description("Elasticsearch Server IP")
@Default.String("elasticServerIp")
String getElasticServerIp();
diff --git a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
index 65aa5ee12ceb..6f91328f5d13 100644
--- a/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
+++ b/sdks/java/io/hadoop-input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java
@@ -68,11 +68,12 @@
public static List<KV<Text, Employee>> getEmployeeData() {
return (data.isEmpty() ? populateEmployeeData() : data)
.stream()
- .map(
- input -> {
- List<String> empData = Splitter.on('_').splitToList(input.getValue());
- return KV.of(new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
- })
- .collect(Collectors.toList());
+ .map(
+ input -> {
+ List<String> empData = Splitter.on('_').splitToList(input.getValue());
+ return KV.of(
+ new Text(input.getKey()), new Employee(empData.get(0), empData.get(1)));
+ })
+ .collect(Collectors.toList());
}
}
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index d2ff9b9a3c43..c742185ca5ab 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -276,8 +276,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Except
desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
}
ReaderContext readerContext = getReaderContext(desiredSplitCount);
- //process the splits returned by native API
- //this could be different from 'desiredSplitCount' calculated above
+ // process the splits returned by native API
+ // this could be different from 'desiredSplitCount' calculated above
LOG.info(
"Splitting into bundles of {} bytes: "
+ "estimated size {}, desired split count {}, actual split count {}",
@@ -486,7 +486,7 @@ private void flush() throws HCatException {
masterWriter.commit(writerContext);
} catch (HCatException e) {
LOG.error("Exception in flush - write/commit data to Hive", e);
- //abort on exception
+ // abort on exception
masterWriter.abort(writerContext);
throw e;
} finally {
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
index 5121e39b3f07..fb83c0060f49 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java
@@ -22,6 +22,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+
/** Helper for creating connection and test tables on hive database via JDBC driver. */
class HiveDatabaseTestHelper {
private static Connection con;
With regards,
Apache Git Services