You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/09/07 04:11:25 UTC

[1/2] incubator-beam git commit: [BEAM-242] Enable and fix checkstyle in Flink runner examples

Repository: incubator-beam
Updated Branches:
  refs/heads/master 387854624 -> 26635d7fb


[BEAM-242] Enable and fix checkstyle in Flink runner examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafb8055
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafb8055
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafb8055

Branch: refs/heads/master
Commit: dafb80556c1d984630c6ccf615ba982903f176df
Parents: 3878546
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Sep 6 07:26:45 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 7 05:55:50 2016 +0200

----------------------------------------------------------------------
 runners/flink/examples/pom.xml                  |  2 --
 .../beam/runners/flink/examples/WordCount.java  |  9 ++++++
 .../runners/flink/examples/package-info.java    | 22 +++++++++++++
 .../flink/examples/streaming/AutoComplete.java  |  5 +--
 .../flink/examples/streaming/JoinExamples.java  |  3 +-
 .../examples/streaming/KafkaIOExamples.java     | 34 ++++++++++----------
 .../KafkaWindowedWordCountExample.java          | 27 +++++++++++++---
 .../examples/streaming/WindowedWordCount.java   | 19 +++++++----
 .../flink/examples/streaming/package-info.java  | 22 +++++++++++++
 9 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml
index 9f705db..b8489fc 100644
--- a/runners/flink/examples/pom.xml
+++ b/runners/flink/examples/pom.xml
@@ -109,12 +109,10 @@
         </executions>
       </plugin>
 
-      <!-- Checkstyle errors for now
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
-      -->
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index ab9297f..9cce757 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -36,8 +36,14 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
+/**
+ * Wordcount pipeline.
+ */
 public class WordCount {
 
+  /**
+   * Function to extract words.
+   */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
@@ -60,6 +66,9 @@ public class WordCount {
     }
   }
 
+  /**
+   * PTransform counting words.
+   */
   public static class CountWords extends PTransform<PCollection<String>,
                     PCollection<KV<String, Long>>> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
new file mode 100644
index 0000000..b0ecb56
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Flink Beam runner exemple.
+ */
+package org.apache.beam.runners.flink.examples;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9b5e31d..4636e3f 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -96,7 +96,8 @@ public class AutoComplete {
 
               @ProcessElement
               public void processElement(ProcessContext c) {
-                CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue());
+                CompletionCandidate cand = new CompletionCandidate(c.element().getKey(),
+                    c.element().getValue());
                 c.output(cand);
               }
             }));
@@ -349,7 +350,7 @@ public class AutoComplete {
       StringBuilder str = new StringBuilder();
       KV<String, List<CompletionCandidate>> elem = c.element();
 
-      str.append(elem.getKey() +" @ "+ window +" -> ");
+      str.append(elem.getKey() + " @ " + window + " -> ");
       for (CompletionCandidate cand: elem.getValue()) {
         str.append(cand.toString() + " ");
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index bf5dfc4..96638aa 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -132,7 +132,8 @@ public class JoinExamples {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+    WindowFn<Object, ?> windowFn = FixedWindows.of(
+        Duration.standardSeconds(options.getWindowSize()));
 
     Pipeline p = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 27faefe..f0bf188 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -56,15 +56,15 @@ public class KafkaIOExamples {
   private static final String KAFKA_AVRO_TOPIC = "output";  // Default kafka topic to read from
   private static final String KAFKA_BROKER = "localhost:9092";  // Default kafka broker to contact
   private static final String GROUP_ID = "myGroup";  // Default groupId
-  private static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
+  private static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect (Kafka)
 
   /**
-   * Read/Write String data to Kafka
+   * Read/Write String data to Kafka.
    */
   public static class KafkaString {
 
     /**
-     * Read String data from Kafka
+     * Read String data from Kafka.
      */
     public static class ReadStringFromKafka {
 
@@ -88,7 +88,7 @@ public class KafkaIOExamples {
     }
 
     /**
-     * Write String data to Kafka
+     * Write String data to Kafka.
      */
     public static class WriteStringToKafka {
 
@@ -113,12 +113,12 @@ public class KafkaIOExamples {
   }
 
   /**
-   * Read/Write Avro data to Kafka
+   * Read/Write Avro data to Kafka.
    */
   public static class KafkaAvro {
 
     /**
-     * Read Avro data from Kafka
+     * Read Avro data from Kafka.
      */
     public static class ReadAvroFromKafka {
 
@@ -142,7 +142,7 @@ public class KafkaIOExamples {
     }
 
     /**
-     * Write Avro data to Kafka
+     * Write Avro data to Kafka.
      */
     public static class WriteAvroToKafka {
 
@@ -169,7 +169,7 @@ public class KafkaIOExamples {
     }
 
     /**
-     * Serialiation/Deserialiation schema for Avro types
+     * Serialiation/Deserialiation schema for Avro types.
      * @param <T>
      */
     static class AvroSerializationDeserializationSchema<T>
@@ -217,7 +217,7 @@ public class KafkaIOExamples {
     }
 
     /**
-     * Custom type for Avro serialization
+     * Custom type for Avro serialization.
      */
     static class MyType implements Serializable {
 
@@ -233,10 +233,10 @@ public class KafkaIOExamples {
 
       @Override
       public String toString() {
-        return "MyType{" +
-            "word='" + word + '\'' +
-            ", count=" + count +
-            '}';
+        return "MyType{"
+            + "word='" + word + '\''
+            + ", count=" + count
+            + '}';
       }
     }
   }
@@ -244,7 +244,7 @@ public class KafkaIOExamples {
   // -------------- Utilities --------------
 
   /**
-   * Custom options for the Pipeline
+   * Custom options for the Pipeline.
    */
   public interface KafkaOptions extends FlinkPipelineOptions {
     @Description("The Kafka topic to read from")
@@ -279,7 +279,7 @@ public class KafkaIOExamples {
   }
 
   /**
-   * Initializes some options for the Flink runner
+   * Initializes some options for the Flink runner.
    * @param args The command line args
    * @return the pipeline
    */
@@ -298,7 +298,7 @@ public class KafkaIOExamples {
   }
 
   /**
-   * Gets KafkaOptions from the Pipeline
+   * Gets KafkaOptions from the Pipeline.
    * @param p the pipeline
    * @return KafkaOptions
    */
@@ -322,7 +322,7 @@ public class KafkaIOExamples {
   }
 
   /**
-   * Print contents to stdout
+   * Print contents to stdout.
    * @param <T> type of the input
    */
   private static class PrintFn<T> extends DoFn<T, T> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 365fb7b..42c42f3 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -40,6 +40,9 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.joda.time.Duration;
 
+/**
+ * Wordcount example using Kafka topic.
+ */
 public class KafkaWindowedWordCountExample {
 
   static final String KAFKA_TOPIC = "test";  // Default kafka topic to read from
@@ -47,6 +50,9 @@ public class KafkaWindowedWordCountExample {
   static final String GROUP_ID = "myGroup";  // Default groupId
   static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to connect to for Kafka
 
+  /**
+   * Function to extract words.
+   */
   public static class ExtractWordsFn extends DoFn<String, String> {
     private final Aggregator<Long, Long> emptyLines =
         createAggregator("emptyLines", new Sum.SumLongFn());
@@ -69,16 +75,24 @@ public class KafkaWindowedWordCountExample {
     }
   }
 
+  /**
+   * Function to format KV as String.
+   */
   public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
     @ProcessElement
     public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
+          + c.timestamp().toString();
       System.out.println(row);
       c.output(row);
     }
   }
 
-  public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
+  /**
+   * Pipeline options.
+   */
+  public interface KafkaStreamingWordCountOptions
+      extends WindowedWordCount.StreamingWordCountOptions {
     @Description("The Kafka topic to read from")
     @Default.String(KAFKA_TOPIC)
     String getKafkaTopic();
@@ -107,7 +121,8 @@ public class KafkaWindowedWordCountExample {
 
   public static void main(String[] args) {
     PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
-    KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
+    KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args)
+        .as(KafkaStreamingWordCountOptions.class);
     options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
     options.setStreaming(true);
     options.setCheckpointingInterval(1000L);
@@ -115,7 +130,8 @@ public class KafkaWindowedWordCountExample {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
+    System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " "
+        + options.getBroker() + " " + options.getGroup());
     Pipeline pipeline = Pipeline.create(options);
 
     Properties p = new Properties();
@@ -132,7 +148,8 @@ public class KafkaWindowedWordCountExample {
     PCollection<String> words = pipeline
         .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
         .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
+        .apply(Window.<String>into(FixedWindows.of(
+            Duration.standardSeconds(options.getWindowSize())))
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index f3361c5..0e250b8 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -60,7 +60,8 @@ public class WindowedWordCount {
   static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
     @ProcessElement
     public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
+      String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
+          + c.timestamp().toString();
       c.output(row);
     }
   }
@@ -87,7 +88,11 @@ public class WindowedWordCount {
     }
   }
 
-  public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options {
+  /**
+   * Pipeline options.
+   */
+  public interface StreamingWordCountOptions
+      extends org.apache.beam.runners.flink.examples.WordCount.Options {
     @Description("Sliding window duration, in seconds")
     @Default.Long(WINDOW_SIZE)
     Long getWindowSize();
@@ -102,7 +107,8 @@ public class WindowedWordCount {
   }
 
   public static void main(String[] args) throws IOException {
-    StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
+    StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(StreamingWordCountOptions.class);
     options.setStreaming(true);
     options.setWindowSize(10L);
     options.setSlide(5L);
@@ -111,8 +117,8 @@ public class WindowedWordCount {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
-        " sec. and a slide of " + options.getSlide());
+    LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize()
+        + " sec. and a slide of " + options.getSlide());
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -120,7 +126,8 @@ public class WindowedWordCount {
         .apply("StreamingWordCount",
             Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
         .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
+        .apply(Window.<String>into(SlidingWindows.of(
+            Duration.standardSeconds(options.getWindowSize()))
             .every(Duration.standardSeconds(options.getSlide())))
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafb8055/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
new file mode 100644
index 0000000..58f41b6
--- /dev/null
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Flink Beam runner exemple.
+ */
+package org.apache.beam.runners.flink.examples.streaming;


[2/2] incubator-beam git commit: [BEAM-242] This closes #919

Posted by jb...@apache.org.
[BEAM-242] This closes #919


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26635d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26635d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26635d7f

Branch: refs/heads/master
Commit: 26635d7fb3d92185845d269909a3d399099df7da
Parents: 3878546 dafb805
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Wed Sep 7 06:11:02 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Wed Sep 7 06:11:02 2016 +0200

----------------------------------------------------------------------
 runners/flink/examples/pom.xml                  |  2 --
 .../beam/runners/flink/examples/WordCount.java  |  9 ++++++
 .../runners/flink/examples/package-info.java    | 22 +++++++++++++
 .../flink/examples/streaming/AutoComplete.java  |  5 +--
 .../flink/examples/streaming/JoinExamples.java  |  3 +-
 .../examples/streaming/KafkaIOExamples.java     | 34 ++++++++++----------
 .../KafkaWindowedWordCountExample.java          | 27 +++++++++++++---
 .../examples/streaming/WindowedWordCount.java   | 19 +++++++----
 .../flink/examples/streaming/package-info.java  | 22 +++++++++++++
 9 files changed, 110 insertions(+), 33 deletions(-)
----------------------------------------------------------------------