You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/07 04:11:25 UTC
[1/2] incubator-beam git commit: [BEAM-242] Enable and fix checkstyle
in Flink runner examples
Repository: incubator-beam
Updated Branches:
refs/heads/master 387854624 -> 26635d7fb
[BEAM-242] Enable and fix checkstyle in Flink runner examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafb8055
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafb8055
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafb8055
Branch: refs/heads/master
Commit: dafb80556c1d984630c6ccf615ba982903f176df
Parents: 3878546
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Sep 6 07:26:45 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 7 05:55:50 2016 +0200
----------------------------------------------------------------------
runners/flink/examples/pom.xml | 2 --
.../beam/runners/flink/examples/WordCount.java | 9 ++++++
.../runners/flink/examples/package-info.java | 22 +++++++++++++
.../flink/examples/streaming/AutoComplete.java | 5 +--
.../flink/examples/streaming/JoinExamples.java | 3 +-
.../examples/streaming/KafkaIOExamples.java | 34 ++++++++++----------
.../KafkaWindowedWordCountExample.java | 27 +++++++++++++---
.../examples/streaming/WindowedWordCount.java | 19 +++++++----
.../flink/examples/streaming/package-info.java | 22 +++++++++++++
9 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index 9f705db..b8489fc 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -109,12 +109,10 @@
</executions>
</plugin>
- <!-- Checkstyle errors for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index ab9297f..9cce757 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -36,8 +36,14 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+/**
+ * Wordcount pipeline.
+ */
public class WordCount {
+ /**
+ * Function to extract words.
+ */
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -60,6 +66,9 @@ public class WordCount {
}
}
+ /**
+ * PTransform counting words.
+ */
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
new file mode 100644
index 0000000..b0ecb56
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Flink Beam runner exemple.
+ */
+package org.apache.beam.runners.flink.examples;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9b5e31d..4636e3f 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -96,7 +96,8 @@ public class AutoComplete {
@ProcessElement
public void processElement(ProcessContext c) {
- CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+ CompletionCandidate cand = new CompletionCandidate(c.element().getKey(),
+ c.element().getValue());
c.output(cand);
}
}));
@@ -349,7 +350,7 @@ public class AutoComplete {
StringBuilder str = new StringBuilder();
KV<String, List<CompletionCandidate>> elem = c.element();
- str.append(elem.getKey() +" @ "+ window +" -> ");
+ str.append(elem.getKey() + " @ " + window + " -> ");
for (CompletionCandidate cand: elem.getValue()) {
str.append(cand.toString() + " ");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index bf5dfc4..96638aa 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -132,7 +132,8 @@ public class JoinExamples {
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
- WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+ WindowFn<Object, ?> windowFn = FixedWindows.of(
+ Duration.standardSeconds(options.getWindowSize()));
Pipeline p = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 27faefe..f0bf188 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -56,15 +56,15 @@ public class KafkaIOExamples {
private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from
private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
private static final String GROUP_ID = "myGroup"; // Default groupId
- private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
+ private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect (Kafka)
/**
- * Read/Write String data to Kafka
+ * Read/Write String data to Kafka.
*/
public static class KafkaString {
/**
- * Read String data from Kafka
+ * Read String data from Kafka.
*/
public static class ReadStringFromKafka {
@@ -88,7 +88,7 @@ public class KafkaIOExamples {
}
/**
- * Write String data to Kafka
+ * Write String data to Kafka.
*/
public static class WriteStringToKafka {
@@ -113,12 +113,12 @@ public class KafkaIOExamples {
}
/**
- * Read/Write Avro data to Kafka
+ * Read/Write Avro data to Kafka.
*/
public static class KafkaAvro {
/**
- * Read Avro data from Kafka
+ * Read Avro data from Kafka.
*/
public static class ReadAvroFromKafka {
@@ -142,7 +142,7 @@ public class KafkaIOExamples {
}
/**
- * Write Avro data to Kafka
+ * Write Avro data to Kafka.
*/
public static class WriteAvroToKafka {
@@ -169,7 +169,7 @@ public class KafkaIOExamples {
}
/**
- * Serialiation/Deserialiation schema for Avro types
+ * Serialiation/Deserialiation schema for Avro types.
* @param <T>
*/
static class AvroSerializationDeserializationSchema<T>
@@ -217,7 +217,7 @@ public class KafkaIOExamples {
}
/**
- * Custom type for Avro serialization
+ * Custom type for Avro serialization.
*/
static class MyType implements Serializable {
@@ -233,10 +233,10 @@ public class KafkaIOExamples {
@Override
public String toString() {
- return "MyType{" +
- "word='" + word + '\'' +
- ", count=" + count +
- '}';
+ return "MyType{"
+ + "word='" + word + '\''
+ + ", count=" + count
+ + '}';
}
}
}
@@ -244,7 +244,7 @@ public class KafkaIOExamples {
// -------------- Utilities --------------
/**
- * Custom options for the Pipeline
+ * Custom options for the Pipeline.
*/
public interface KafkaOptions extends FlinkPipelineOptions {
@Description("The Kafka topic to read from")
@@ -279,7 +279,7 @@ public class KafkaIOExamples {
}
/**
- * Initializes some options for the Flink runner
+ * Initializes some options for the Flink runner.
* @param args The command line args
* @return the pipeline
*/
@@ -298,7 +298,7 @@ public class KafkaIOExamples {
}
/**
- * Gets KafkaOptions from the Pipeline
+ * Gets KafkaOptions from the Pipeline.
* @param p the pipeline
* @return KafkaOptions
*/
@@ -322,7 +322,7 @@ public class KafkaIOExamples {
}
/**
- * Print contents to stdout
+ * Print contents to stdout.
* @param <T> type of the input
*/
private static class PrintFn<T> extends DoFn<T, T> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 365fb7b..42c42f3 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -40,6 +40,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.joda.time.Duration;
+/**
+ * Wordcount example using Kafka topic.
+ */
public class KafkaWindowedWordCountExample {
static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from
@@ -47,6 +50,9 @@ public class KafkaWindowedWordCountExample {
static final String GROUP_ID = "myGroup"; // Default groupId
static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
+ /**
+ * Function to extract words.
+ */
public static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -69,16 +75,24 @@ public class KafkaWindowedWordCountExample {
}
}
+ /**
+ * Function to format KV as String.
+ */
public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
+ + c.timestamp().toString();
System.out.println(row);
c.output(row);
}
}
- public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+ /**
+ * Pipeline options.
+ */
+ public interface KafkaStreamingWordCountOptions
+ extends WindowedWordCount.StreamingWordCountOptions {
@Description("The Kafka topic to read from")
@Default.String(KAFKA_TOPIC)
String getKafkaTopic();
@@ -107,7 +121,8 @@ public class KafkaWindowedWordCountExample {
public static void main(String[] args) {
PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
- KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+ KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args)
+ .as(KafkaStreamingWordCountOptions.class);
options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
@@ -115,7 +130,8 @@ public class KafkaWindowedWordCountExample {
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
- System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+ System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " "
+ + options.getBroker() + " " + options.getGroup());
Pipeline pipeline = Pipeline.create(options);
Properties p = new Properties();
@@ -132,7 +148,8 @@ public class KafkaWindowedWordCountExample {
PCollection<String> words = pipeline
.apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+ .apply(Window.<String>into(FixedWindows.of(
+ Duration.standardSeconds(options.getWindowSize())))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index f3361c5..0e250b8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -60,7 +60,8 @@ public class WindowedWordCount {
static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
@ProcessElement
public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+ String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
+ + c.timestamp().toString();
c.output(row);
}
}
@@ -87,7 +88,11 @@ public class WindowedWordCount {
}
}
- public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options {
+ /**
+ * Pipeline options.
+ */
+ public interface StreamingWordCountOptions
+ extends org.apache.beam.runners.flink.examples.WordCount.Options {
@Description("Sliding window duration, in seconds")
@Default.Long(WINDOW_SIZE)
Long getWindowSize();
@@ -102,7 +107,8 @@ public class WindowedWordCount {
}
public static void main(String[] args) throws IOException {
- StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+ StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(StreamingWordCountOptions.class);
options.setStreaming(true);
options.setWindowSize(10L);
options.setSlide(5L);
@@ -111,8 +117,8 @@ public class WindowedWordCount {
options.setExecutionRetryDelay(3000L);
options.setRunner(FlinkRunner.class);
- LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
- " sec. and a slide of " + options.getSlide());
+ LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize()
+ + " sec. and a slide of " + options.getSlide());
Pipeline pipeline = Pipeline.create(options);
@@ -120,7 +126,8 @@ public class WindowedWordCount {
.apply("StreamingWordCount",
Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
.apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+ .apply(Window.<String>into(SlidingWindows.of(
+ Duration.standardSeconds(options.getWindowSize()))
.every(Duration.standardSeconds(options.getSlide())))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
new file mode 100644
index 0000000..58f41b6
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Flink Beam runner exemple.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
[2/2] incubator-beam git commit: [BEAM-242] This closes #919
Posted by jb...@apache.org.
[BEAM-242] This closes #919
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26635d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26635d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26635d7f
Branch: refs/heads/master
Commit: 26635d7fb3d92185845d269909a3d399099df7da
Parents: 3878546 dafb805
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Sep 7 06:11:02 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 7 06:11:02 2016 +0200
----------------------------------------------------------------------
runners/flink/examples/pom.xml | 2 --
.../beam/runners/flink/examples/WordCount.java | 9 ++++++
.../runners/flink/examples/package-info.java | 22 +++++++++++++
.../flink/examples/streaming/AutoComplete.java | 5 +--
.../flink/examples/streaming/JoinExamples.java | 3 +-
.../examples/streaming/KafkaIOExamples.java | 34 ++++++++++----------
.../KafkaWindowedWordCountExample.java | 27 +++++++++++++---
.../examples/streaming/WindowedWordCount.java | 19 +++++++----
.../flink/examples/streaming/package-info.java | 22 +++++++++++++
9 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------