You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:07 UTC
[36/50] [abbrv] incubator-beam git commit: [BEAM-242] Enable
checkstyle and fix checkstyle errors in Flink runner
[BEAM-242] Enable checkstyle and fix checkstyle errors in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5eb44aa0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5eb44aa0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5eb44aa0
Branch: refs/heads/gearpump-runner
Commit: 5eb44aa01157ca62f1a618d1738eb064ca3a10e4
Parents: 9ae5cc7
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Aug 25 16:19:54 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 2 -
.../FlinkPipelineExecutionEnvironment.java | 17 +-
.../runners/flink/FlinkPipelineOptions.java | 27 ++--
.../runners/flink/FlinkRunnerRegistrar.java | 6 +
.../beam/runners/flink/FlinkRunnerResult.java | 17 +-
.../beam/runners/flink/TestFlinkRunner.java | 8 +-
.../apache/beam/runners/flink/package-info.java | 22 +++
.../FlinkBatchPipelineTranslator.java | 15 +-
.../FlinkBatchTranslationContext.java | 10 +-
.../translation/FlinkPipelineTranslator.java | 2 +-
.../FlinkStreamingTransformTranslators.java | 5 +-
.../flink/translation/TranslationMode.java | 8 +-
.../translation/functions/package-info.java | 22 +++
.../runners/flink/translation/package-info.java | 22 +++
.../translation/types/CoderTypeSerializer.java | 2 +-
.../types/EncodedValueSerializer.java | 162 ++++++++++---------
.../flink/translation/types/package-info.java | 22 +++
.../utils/SerializedPipelineOptions.java | 2 +-
.../flink/translation/utils/package-info.java | 22 +++
.../wrappers/DataOutputViewWrapper.java | 2 +-
.../translation/wrappers/package-info.java | 22 +++
.../wrappers/streaming/DoFnOperator.java | 12 +-
.../streaming/SingletonKeyedWorkItem.java | 5 +
.../streaming/SingletonKeyedWorkItemCoder.java | 14 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../wrappers/streaming/WorkItemKeySelector.java | 3 +-
.../streaming/io/UnboundedFlinkSink.java | 13 +-
.../streaming/io/UnboundedFlinkSource.java | 29 ++--
.../streaming/io/UnboundedSocketSource.java | 46 ++++--
.../wrappers/streaming/io/package-info.java | 22 +++
.../wrappers/streaming/package-info.java | 22 +++
.../beam/runners/flink/PipelineOptionsTest.java | 3 +
.../beam/runners/flink/WriteSinkITCase.java | 3 +-
.../apache/beam/runners/flink/package-info.java | 22 +++
.../streaming/FlinkStateInternalsTest.java | 3 +-
.../flink/streaming/GroupByNullKeyTest.java | 6 +
.../streaming/TopWikipediaSessionsITCase.java | 2 +-
.../streaming/UnboundedSourceWrapperTest.java | 33 ++--
.../runners/flink/streaming/package-info.java | 22 +++
39 files changed, 490 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 08adc60..7c32280 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -234,12 +234,10 @@
</executions>
</plugin>
- <!-- Checkstyle errors for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d1977a4..a5d33b4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
/**
* The class that instantiates and manages the execution of a given job.
* Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
- * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
- * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and
- * executes the (translated) job.
+ * the adequate execution environment ({@link ExecutionEnvironment}
+ * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
+ * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
+ * transform the Beam job into a Flink one, and executes the (translated) job.
*/
public class FlinkPipelineExecutionEnvironment {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
private final FlinkPipelineOptions options;
@@ -79,8 +80,8 @@ public class FlinkPipelineExecutionEnvironment {
* Depending on if the job is a Streaming or a Batch one, this method creates
* the necessary execution environment and pipeline translator, and translates
* the {@link org.apache.beam.sdk.values.PCollection} program into
- * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
- * one.
+ * a {@link org.apache.flink.api.java.DataSet}
+ * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
* */
public void translate(Pipeline pipeline) {
this.flinkBatchEnv = null;
@@ -213,7 +214,7 @@ public class FlinkPipelineExecutionEnvironment {
// If the value is not -1, then the validity checks are applied.
// By default, checkpointing is disabled.
long checkpointInterval = options.getCheckpointingInterval();
- if(checkpointInterval != -1) {
+ if (checkpointInterval != -1) {
if (checkpointInterval < 1) {
throw new IllegalArgumentException("The checkpoint interval must be positive");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 6561fa5..1fb23ec 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -29,7 +29,8 @@ import org.apache.beam.sdk.options.StreamingOptions;
/**
* Options which can be used to configure a Flink PipelineRunner.
*/
-public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+public interface FlinkPipelineOptions
+ extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
/**
* List of local files to make available to workers.
@@ -38,8 +39,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
* <p>
* The default value is the list of jars from the main program's classpath.
*/
- @Description("Jar-Files to send to all workers and put on the classpath. " +
- "The default value is all files from the classpath.")
+ @Description("Jar-Files to send to all workers and put on the classpath. "
+ + "The default value is all files from the classpath.")
@JsonIgnore
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
@@ -51,9 +52,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
* Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
* "[auto]" will let the system decide where to execute the pipeline based on the environment.
*/
- @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
- " either be of the form \"host:port\" or one of the special values [local], " +
- "[collection] or [auto].")
+ @Description("Address of the Flink Master where the Pipeline should be executed. Can"
+ + " either be of the form \"host:port\" or one of the special values [local], "
+ + "[collection] or [auto].")
String getFlinkMaster();
void setFlinkMaster(String value);
@@ -62,21 +63,23 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
Integer getParallelism();
void setParallelism(Integer value);
- @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
- "fault tolerance).")
+ @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
+ + "pipeline state used for fault tolerance).")
@Default.Long(-1L)
Long getCheckpointingInterval();
void setCheckpointingInterval(Long interval);
- @Description("Sets the number of times that failed tasks are re-executed. " +
- "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
- "that the system default value (as defined in the configuration) should be used.")
+ @Description("Sets the number of times that failed tasks are re-executed. "
+ + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+ + "that the system default value (as defined in the configuration) should be used.")
@Default.Integer(-1)
Integer getNumberOfExecutionRetries();
void setNumberOfExecutionRetries(Integer retries);
- @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+ @Description("Sets the delay between executions. A value of {@code -1} "
+ + "indicates that the default value should be used.")
@Default.Long(-1L)
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
index f328279..0e4b513 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -36,6 +36,9 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
public class FlinkRunnerRegistrar {
private FlinkRunnerRegistrar() { }
+ /**
+ * Pipeline runner registrar.
+ */
@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
@@ -46,6 +49,9 @@ public class FlinkRunnerRegistrar {
}
}
+ /**
+ * Pipeline options registrar.
+ */
@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index dd0733a..90bb64d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -35,9 +35,9 @@ public class FlinkRunnerResult implements PipelineResult {
private final Map<String, Object> aggregators;
private final long runtime;
public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
- this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
- Collections.<String, Object>emptyMap() :
- Collections.unmodifiableMap(aggregators);
+ this.aggregators = (aggregators == null || aggregators.isEmpty())
+ ? Collections.<String, Object>emptyMap()
+ : Collections.unmodifiableMap(aggregators);
this.runtime = runtime;
}
@@ -47,7 +47,8 @@ public class FlinkRunnerResult implements PipelineResult {
}
@Override
- public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+ public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
// TODO provide a list of all accumulator step values
Object value = aggregators.get(aggregator.getName());
if (value != null) {
@@ -65,10 +66,10 @@ public class FlinkRunnerResult implements PipelineResult {
@Override
public String toString() {
- return "FlinkRunnerResult{" +
- "aggregators=" + aggregators +
- ", runtime=" + runtime +
- '}';
+ return "FlinkRunnerResult{"
+ + "aggregators=" + aggregators
+ + ", runtime=" + runtime
+ + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index dd231d6..67a7d38 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -26,6 +26,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+/**
+ * Test Flink runner.
+ */
public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
private FlinkRunner delegate;
@@ -37,7 +40,8 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
}
public static TestFlinkRunner fromOptions(PipelineOptions options) {
- FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
return new TestFlinkRunner(flinkOptions);
}
@@ -50,7 +54,7 @@ public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
@Override
public <OutputT extends POutput, InputT extends PInput>
- OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
return delegate.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 66c48b0..aa38bfb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -91,15 +91,20 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
// get the transformation corresponding to the node we are
// currently visiting and translate it into its Flink alternative.
PTransform<?, ?> transform = node.getTransform();
- BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ BatchTransformTranslator<?> translator =
+ FlinkBatchTransformTranslators.getTranslator(transform);
if (translator == null) {
LOG.info(node.getTransform().getClass().toString());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ throw new UnsupportedOperationException("The transform " + transform
+ + " is currently not supported.");
}
applyBatchTransform(transform, node, translator);
}
- private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+ private <T extends PTransform<?, ?>> void applyBatchTransform(
+ PTransform<?, ?> transform,
+ TransformTreeNode node,
+ BatchTransformTranslator<?> translator) {
@SuppressWarnings("unchecked")
T typedTransform = (T) transform;
@@ -116,8 +121,8 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
/**
* A translator of a {@link PTransform}.
*/
- public interface BatchTransformTranslator<Type extends PTransform> {
- void translateNode(Type transform, FlinkBatchTranslationContext context);
+ public interface BatchTransformTranslator<TransformT extends PTransform> {
+ void translateNode(TransformT transform, FlinkBatchTranslationContext context);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 835648e..611f5e6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -40,7 +40,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
* {@link FlinkBatchTransformTranslators}.
*/
public class FlinkBatchTranslationContext {
-
+
private final Map<PValue, DataSet<?>> dataSets;
private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
@@ -55,9 +55,9 @@ public class FlinkBatchTranslationContext {
private final PipelineOptions options;
private AppliedPTransform<?, ?, ?> currentTransform;
-
+
// ------------------------------------------------------------------------
-
+
public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
this.env = env;
this.options = options;
@@ -66,7 +66,7 @@ public class FlinkBatchTranslationContext {
this.danglingDataSets = new HashMap<>();
}
-
+
// ------------------------------------------------------------------------
public Map<PValue, DataSet<?>> getDanglingDataSets() {
@@ -80,7 +80,7 @@ public class FlinkBatchTranslationContext {
public PipelineOptions getPipelineOptions() {
return options;
}
-
+
@SuppressWarnings("unchecked")
public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
// assume that the DataSet is used as an input if retrieved here
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
index 4db929b..cba28e4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
@@ -39,7 +39,7 @@ public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.D
}
/**
- * Utility formatting method
+ * Utility formatting method.
* @param n number of spaces to generate
* @return String with "|" followed by n spaces
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 3719fa8..4b819b7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,9 +18,10 @@
package org.apache.beam.runners.flink.translation;
-import com.google.api.client.util.Maps;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.FlinkCoder;
@@ -63,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
index 71eb655..57b69aa 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
@@ -18,14 +18,14 @@
package org.apache.beam.runners.flink.translation;
/**
- * The translation mode of the Beam Pipeline
+ * The translation mode of the Beam Pipeline.
*/
public enum TranslationMode {
- /** Uses the batch mode of Flink */
+ /** Uses the batch mode of Flink. */
BATCH,
- /** Uses the streaming mode of Flink */
+ /** Uses the streaming mode of Flink. */
STREAMING
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
new file mode 100644
index 0000000..9f11212
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.functions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
new file mode 100644
index 0000000..af4b354
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index 4eda357..e210ed9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.DataOutputView;
* Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
*/
public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-
+
private Coder<T> coder;
public CoderTypeSerializer(Coder<T> coder) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
index f3e667d..41db61e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java
@@ -18,7 +18,9 @@
package org.apache.beam.runners.flink.translation.types;
import java.io.IOException;
+
import org.apache.beam.sdk.coders.Coder;
+
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -28,84 +30,84 @@ import org.apache.flink.core.memory.DataOutputView;
*/
public final class EncodedValueSerializer extends TypeSerializer<byte[]> {
- private static final long serialVersionUID = 1L;
-
- private static final byte[] EMPTY = new byte[0];
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public byte[] createInstance() {
- return EMPTY;
- }
-
- @Override
- public byte[] copy(byte[] from) {
- return from;
- }
-
- @Override
- public byte[] copy(byte[] from, byte[] reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
-
- @Override
- public void serialize(byte[] record, DataOutputView target) throws IOException {
- if (record == null) {
- throw new IllegalArgumentException("The record must not be null.");
- }
-
- final int len = record.length;
- target.writeInt(len);
- target.write(record);
- }
-
- @Override
- public byte[] deserialize(DataInputView source) throws IOException {
- final int len = source.readInt();
- byte[] result = new byte[len];
- source.readFully(result);
- return result;
- }
-
- @Override
- public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- final int len = source.readInt();
- target.writeInt(len);
- target.write(source, len);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof EncodedValueSerializer;
- }
-
- @Override
- public int hashCode() {
- return this.getClass().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof EncodedValueSerializer;
- }
-
- @Override
- public TypeSerializer<byte[]> duplicate() {
- return this;
- }
+ private static final long serialVersionUID = 1L;
+
+ private static final byte[] EMPTY = new byte[0];
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public byte[] createInstance() {
+ return EMPTY;
+ }
+
+ @Override
+ public byte[] copy(byte[] from) {
+ return from;
+ }
+
+ @Override
+ public byte[] copy(byte[] from, byte[] reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+
+ @Override
+ public void serialize(byte[] record, DataOutputView target) throws IOException {
+ if (record == null) {
+ throw new IllegalArgumentException("The record must not be null.");
+ }
+
+ final int len = record.length;
+ target.writeInt(len);
+ target.write(record);
+ }
+
+ @Override
+ public byte[] deserialize(DataInputView source) throws IOException {
+ final int len = source.readInt();
+ byte[] result = new byte[len];
+ source.readFully(result);
+ return result;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ final int len = source.readInt();
+ target.writeInt(len);
+ target.write(source, len);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof EncodedValueSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getClass().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof EncodedValueSerializer;
+ }
+
+ @Override
+ public TypeSerializer<byte[]> duplicate() {
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
new file mode 100644
index 0000000..6fb3182
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.types;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 0c6cea8..fe2602b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -33,7 +33,7 @@ public class SerializedPipelineOptions implements Serializable {
private final byte[] serializedOptions;
- /** Lazily initialized copy of deserialized options */
+ /** Lazily initialized copy of deserialized options. */
private transient PipelineOptions pipelineOptions;
public SerializedPipelineOptions(PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
new file mode 100644
index 0000000..5dedd53
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.utils;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
index 2cb9b18..f2d9db2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataOutputView;
* {@link java.io.OutputStream}.
*/
public class DataOutputViewWrapper extends OutputStream {
-
+
private DataOutputView outputView;
public DataOutputViewWrapper(DataOutputView outputView) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
new file mode 100644
index 0000000..72f7deb
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b0fccc..3b917e2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -27,6 +27,9 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -40,11 +43,8 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
@@ -79,8 +79,8 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
*
* @param <InputT> the input type of the {@link OldDoFn}
* @param <FnOutputT> the output type of the {@link OldDoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output type when we have
- * side outputs
+ * @param <OutputT> the output type of the operator, this can be different from the fn output
+ * type when we have side outputs
*/
public class DoFnOperator<InputT, FnOutputT, OutputT>
extends AbstractStreamOperator<OutputT>
@@ -166,7 +166,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
currentInputWatermark = Long.MIN_VALUE;
currentOutputWatermark = currentInputWatermark;
- Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+ Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
@Override
public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
Class<?> fnClass,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
index 5751aac..6d2582b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -22,6 +22,11 @@ import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
final K key;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index 5e583e9..37454a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -35,7 +35,13 @@ import org.apache.beam.sdk.util.KeyedWorkItemCoder;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.WindowedValue;
-public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
+/**
+ * Singleton keyed word iteam coder.
+ * @param <K>
+ * @param <ElemT>
+ */
+public class SingletonKeyedWorkItemCoder<K, ElemT>
+ extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
/**
* Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
* coder.
@@ -68,7 +74,7 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<Singlet
this.keyCoder = keyCoder;
this.elemCoder = elemCoder;
this.windowCoder = windowCoder;
- valueCoder= WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
+ valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
}
public Coder<K> getKeyCoder() {
@@ -80,7 +86,9 @@ public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<Singlet
}
@Override
- public void encode(SingletonKeyedWorkItem<K, ElemT> value, OutputStream outStream, Context context)
+ public void encode(SingletonKeyedWorkItem<K, ElemT> value,
+ OutputStream outStream,
+ Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
keyCoder.encode(value.key(), outStream, nestedContext);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index b893116..29ae6ae 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -34,6 +34,7 @@ import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
index 51d9e0c..7829163 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
* that all key comparisons/hashing happen on the encoded form.
*/
public class WorkItemKeySelector<K, V>
- implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>, ResultTypeQueryable<ByteBuffer> {
+ implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
+ ResultTypeQueryable<ByteBuffer> {
private final Coder<K> keyCoder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 2117e9d..5b01796 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -62,7 +62,8 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
}
@Override
- public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception {
+ public void finalize(Iterable<Object> writerResults, PipelineOptions options)
+ throws Exception {
}
@@ -70,12 +71,14 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
public Coder<Object> getWriterResultCoder() {
return new Coder<Object>() {
@Override
- public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException {
+ public void encode(Object value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
}
@Override
- public Object decode(InputStream inStream, Context context) throws CoderException, IOException {
+ public Object decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
return null;
}
@@ -110,7 +113,9 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
}
@Override
- public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception {
+ public void registerByteSizeObserver(Object value,
+ ElementByteSizeObserver observer,
+ Context context) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index c6e0825..ac20c34 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -36,17 +36,19 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
private final SourceFunction<T> flinkSource;
- /** Coder set during translation */
+ /** Coder set during translation. */
private Coder<T> coder;
- /** Timestamp / watermark assigner for source; defaults to ingestion time */
- private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new IngestionTimeExtractor<T>();
+ /** Timestamp / watermark assigner for source; defaults to ingestion time. */
+ private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner =
+ new IngestionTimeExtractor<T>();
public UnboundedFlinkSource(SourceFunction<T> source) {
flinkSource = checkNotNull(source);
}
- public UnboundedFlinkSource(SourceFunction<T> source, AssignerWithPeriodicWatermarks<T> timestampAssigner) {
+ public UnboundedFlinkSource(SourceFunction<T> source,
+ AssignerWithPeriodicWatermarks<T> timestampAssigner) {
flinkSource = checkNotNull(source);
flinkTimestampAssigner = checkNotNull(timestampAssigner);
}
@@ -60,19 +62,25 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
}
@Override
- public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+ public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(
+ int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ throw new RuntimeException("Flink Sources are supported only when "
+ + "running with the FlinkRunner.");
}
@Override
- public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+ public UnboundedReader<T> createReader(PipelineOptions options,
+ @Nullable CheckpointMark checkpointMark) {
+ throw new RuntimeException("Flink Sources are supported only when "
+ + "running with the FlinkRunner.");
}
@Nullable
@Override
public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
+ throw new RuntimeException("Flink Sources are supported only when "
+ + "running with the FlinkRunner.");
}
@@ -100,7 +108,8 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
* @param <T> The type that the source function produces.
* @return The wrapped source function.
*/
- public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
+ public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
+ SourceFunction<T> flinkSource) {
return new UnboundedFlinkSource<>(flinkSource);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index 8d37fe7..96b5138 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -38,9 +38,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging.
+ * An example unbounded Beam source that reads input from a socket.
+ * This is used mainly for testing and debugging.
* */
-public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
+public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ extends UnboundedSource<String, CheckpointMarkT> {
private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
@@ -60,7 +62,11 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
}
- public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+ public UnboundedSocketSource(String hostname,
+ int port,
+ char delimiter,
+ long maxNumRetries,
+ long delayBetweenRetries) {
this.hostname = hostname;
this.port = port;
this.delimiter = delimiter;
@@ -89,12 +95,15 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
}
@Override
- public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
- return Collections.<UnboundedSource<String, C>>singletonList(this);
+ public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits(
+ int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);
}
@Override
- public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ public UnboundedReader<String> createReader(PipelineOptions options,
+ @Nullable CheckpointMarkT checkpointMark) {
return new UnboundedSocketReader(this);
}
@@ -109,7 +118,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
@Override
public void validate() {
checkArgument(port > 0 && port < 65536, "port is out of range");
- checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+ checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), "
+ + "or -1 (infinite retries)");
checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
}
@@ -118,7 +128,11 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
return DEFAULT_SOCKET_CODER;
}
- public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+ /**
+ * Unbounded socket reader.
+ */
+ public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String>
+ implements Serializable {
private static final long serialVersionUID = 7526472295622776147L;
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
@@ -138,7 +152,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
private void openConnection() throws IOException {
this.socket = new Socket();
- this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+ this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()),
+ CONNECTION_TIMEOUT_TIME);
this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
this.isRunning = true;
}
@@ -149,11 +164,14 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
while (!isRunning) {
try {
openConnection();
- LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+ LOG.info("Connected to server socket " + this.source.getHostname() + ':'
+ + this.source.getPort());
return advance();
} catch (IOException e) {
- LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+ LOG.info("Lost connection to server socket " + this.source.getHostname() + ':'
+ + this.source.getPort() + ". Retrying in "
+ + this.source.getDelayBetweenRetries() + " msecs...");
if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
try {
@@ -167,7 +185,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
}
}
}
- LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+ LOG.error("Unable to connect to host " + this.source.getHostname()
+ + " : " + this.source.getPort());
return false;
}
@@ -211,7 +230,8 @@ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> ext
this.reader.close();
this.socket.close();
this.isRunning = false;
- LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+ LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":"
+ + this.source.getPort() + ".");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
new file mode 100644
index 0000000..b431ce7
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
new file mode 100644
index 0000000..0674871
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 32339dc..3c30fed 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -52,6 +52,9 @@ import org.junit.Test;
*/
public class PipelineOptionsTest {
+ /**
+ * Pipeline options.
+ */
public interface MyOptions extends FlinkPipelineOptions {
@Description("Bla bla bla")
@Default.String("Hello")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 0988146..37eedb2 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -118,7 +118,8 @@ public class WriteSinkITCase extends JavaProgramTestBase {
}
@Override
- public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+ public void finalize(Iterable<String> writerResults, PipelineOptions options)
+ throws Exception {
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 711ae00..628212a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -56,7 +56,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for {@link FlinkStateInternals}. This is based on the tests for {@code InMemoryStateInternals}.
+ * Tests for {@link FlinkStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
*/
@RunWith(JUnit4.class)
public class FlinkStateInternalsTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index ab98c27..c6381ee 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -36,6 +36,9 @@ import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
+/**
+ * Test for GroupByNullKey.
+ */
public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable {
@@ -58,6 +61,9 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
}
+ /**
+ * DoFn extracting user and timestamp.
+ */
public static class ExtractUserAndTimestamp extends OldDoFn<KV<Integer, String>, String> {
private static final long serialVersionUID = 0;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 64f978f..9410481 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -38,7 +38,7 @@ import org.joda.time.Instant;
/**
- * Session window test
+ * Session window test.
*/
public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
protected String resultPath;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index a70ad49..73124a9 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -56,14 +56,14 @@ public class UnboundedSourceWrapperTest {
*/
@Test
public void testWithOneReader() throws Exception {
- final int NUM_ELEMENTS = 20;
+ final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
- TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);
@@ -92,7 +92,7 @@ public class UnboundedSourceWrapperTest {
StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
count++;
- if (count >= NUM_ELEMENTS) {
+ if (count >= numElements) {
throw new SuccessException();
}
}
@@ -116,14 +116,14 @@ public class UnboundedSourceWrapperTest {
*/
@Test
public void testWithMultipleReaders() throws Exception {
- final int NUM_ELEMENTS = 20;
+ final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
- TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 4);
@@ -149,10 +149,10 @@ public class UnboundedSourceWrapperTest {
@Override
public void collect(
- StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
count++;
- if (count >= NUM_ELEMENTS) {
+ if (count >= numElements) {
throw new SuccessException();
}
}
@@ -177,14 +177,14 @@ public class UnboundedSourceWrapperTest {
*/
@Test
public void testRestore() throws Exception {
- final int NUM_ELEMENTS = 20;
+ final int numElements = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();
// this source will emit exactly NUM_ELEMENTS across all parallel readers,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
- TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+ TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>(options, source, 1);
@@ -213,11 +213,11 @@ public class UnboundedSourceWrapperTest {
@Override
public void collect(
- StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
count++;
- if (count >= NUM_ELEMENTS / 2) {
+ if (count >= numElements / 2) {
throw new SuccessException();
}
}
@@ -238,7 +238,7 @@ public class UnboundedSourceWrapperTest {
byte[] snapshot = flinkWrapper.snapshotState(0, 0);
// create a completely new source but restore from the snapshot
- TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS);
+ TestCountingSource restoredSource = new TestCountingSource(numElements);
UnboundedSourceWrapper<
KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
new UnboundedSourceWrapper<>(options, restoredSource, 1);
@@ -271,10 +271,10 @@ public class UnboundedSourceWrapperTest {
@Override
public void collect(
- StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
count++;
- if (count >= NUM_ELEMENTS / 2) {
+ if (count >= numElements / 2) {
throw new SuccessException();
}
}
@@ -292,7 +292,7 @@ public class UnboundedSourceWrapperTest {
assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
// verify that we saw all NUM_ELEMENTS elements
- assertTrue(emittedElements.size() == NUM_ELEMENTS);
+ assertTrue(emittedElements.size() == numElements);
}
@SuppressWarnings("unchecked")
@@ -310,7 +310,8 @@ public class UnboundedSourceWrapperTest {
when(mockTask.getConfiguration()).thenReturn(cfg);
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+ when(mockTask.getAccumulatorMap())
+ .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5eb44aa0/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
new file mode 100644
index 0000000..08a1e03
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.streaming;