You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2016/03/08 15:53:01 UTC

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1774

    [FLINK-3591] Replace Quickstart K-Means Example by Streaming Example

    I attached a rough pdf rendering of the changed page.
    
    [Apache Flink 1.1-SNAPSHOT Documentation: Quick Start: Monitoring the Wikipedia Edit Stream.pdf](https://github.com/apache/flink/files/163347/Apache.Flink.1.1-SNAPSHOT.Documentation.Quick.Start.Monitoring.the.Wikipedia.Edit.Stream.pdf)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink doc-fix-quickstart-example

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1774.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1774
    
----
commit ce53fa70512d7817aed64ec30bd056567c7c4f55
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-03-08T14:49:09Z

    [FLINK-3591] Replace Quickstart K-Means Example by Streaming Example

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55374388
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    --- End diff --
    
    Then how about a single sentence instead of 3?
    "We need to specify a window here because we are
    dealing with an infinite stream of events. If you want to compute an aggregation on such an
    infinite stream you never know when you are finished. That's where windows come into play,
    they specify a time slice in which we should perform our computation" ==> I would basically keep the last sentence only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1774#issuecomment-193822714
  
    Thanks for the thorough review @vasia, that was quick :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371997
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    --- End diff --
    
    I wouldn't, because as you said it complicates stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1774#issuecomment-193821304
  
    Just a few misspellings. Otherwise looks great!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55376606
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before running our program.
    +Build the project using Maven because we need the jar file for running on the cluster:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +
    +The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +
    +Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +
    +We also have to create the Kafka Topic, so that we our program can write to it:
    +
    +{% highlight bash %}
    +$ cd my/kafka/directory
    +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
    +{% endhighlight %}
    +
    +Now we are ready to run our jar file on the local Flink cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    +{% endhighlight %}
    +
    +The output of that command should look similar to this, if everything went according to plan:
    +
    +```
    +03/08/2016 15:09:27 Job execution switched to status RUNNING.
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    +```
    +
    +You can see how the individual operators start running. There are only two because
    +the operations after the window get folded into one operation for performance reasons. In Flink
    +we call this *chaining*.
    +
    +You can observe the output of the program by inspecting the Kafka topic using the Kafka
    +console consumer:
    +
    +{% highlight bash %}
    +bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result
    +{% endhighlight %}
    +
    +You can also check out the Flink dashboard which should be running at [http://localhost:8081](http://localhost:8081).
    +You get an overview of your cluster ressources and running jobs:
    +
    +<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager Overview"/></a>
     
    -## Setup Flink
    -Follow the [instructions](setup_quickstart.html) to setup Flink and enter the root directory of your Flink setup.
    -
    -## Generate Input Data
    -Flink contains a data generator for K-Means.
    -
    -~~~bash
    -# Assuming you are in the root directory of your Flink setup
    -mkdir kmeans
    -cd kmeans
    -# Run data generator
    -java -cp ../examples/batch/KMeans.jar:../lib/flink-dist-{{ site.version }}.jar \
    -  org.apache.flink.examples.java.clustering.util.KMeansDataGenerator \
    -  -points 500 -k 10 -stddev 0.08 -output `pwd`
    -~~~
    -
    -The generator has the following arguments (arguments in `[]` are optional):
    -
    -~~~bash
    --points <num> -k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range <centroid range>] [-seed <seed>]
    -~~~
    -
    -The _relative standard deviation_ is an interesting tuning parameter. It determines the closeness of the points to randomly generated centers.
    -
    -The `kmeans/` directory should now contain two files: `centers` and `points`. The `points` file contains the points to cluster and the `centers` file contains initial cluster centers.
    -
    -
    -## Inspect the Input Data
    -Use the `plotPoints.py` tool to review the generated data points. [Download Python Script](plotPoints.py)
    -
    -~~~ bash
    -python plotPoints.py points ./points input
    -~~~ 
    -
    -Note: You might have to install [matplotlib](http://matplotlib.org/) (`python-matplotlib` package on Ubuntu) to use the Python script.
    -
    -You can review the input data stored in the `input-plot.pdf`, for example with Evince (`evince input-plot.pdf`).
    -
    -The following overview presents the impact of the different standard deviations on the input data.
    -
    -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15|
    -|:--------------------:|:--------------------:|:--------------------:|
    -|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans003.png" alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans008.png" alt="example2" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans015.png" alt="example3" style="width: 275px;"/>|
    -
    -
    -## Start Flink
    -Start Flink and the web job submission client on your local machine.
    -
    -~~~ bash
    -# return to the Flink root directory
    -cd ..
    -# start Flink
    -./bin/start-local.sh
    -~~~
    -
    -## Inspect and Run the K-Means Example Program
    -The Flink web interface allows to submit Flink programs using a graphical user interface.
    -
    -<div class="row" style="padding-top:15px">
    -	<div class="col-md-6">
    -		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" /></a>
    -	</div>
    -	<div class="col-md-6">
    -		1. Open web interface on <a href="http://localhost:8081">localhost:8081</a> <br>
    -		2. Select the "Submit new Job" page in the menu <br>
    -		3. Upload the <code>KMeans.jar</code> from <code>examples/batch</code> by clicking the "Add New" button, and then the "Upload" button. <br>
    -		4. Select the <code>KMeans.jar</code> form the list of jobs <br>
    -		5. Enter the arguments and options in the lower box: <br>
    -		    Leave the <i>Entry Class</i> and <i>Parallelism</i> form empty<br>
    -		    Enter the following program arguments: <br>
    -		    (KMeans expects the following args: <code>--points &lt;path&gt; --centroids &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;</code>
    -			{% highlight bash %}--points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10{% endhighlight %}<br>
    -		6. Press <b>Submit</b> to start the job
    -	</div>
    -</div>
    -<hr>
    -<div class="row" style="padding-top:15px">
    -	<div class="col-md-6">
    -		<a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" /></a>
    -	</div>
    -
    -	<div class="col-md-6">
    -		Watch the job executing.
    -	</div>
    -</div>
    -
    -
    -## Shutdown Flink
    -Stop Flink when you are done.
    -
    -~~~ bash
    -# stop Flink
    -./bin/stop-local.sh
    -~~~
    -
    -## Analyze the Result
    -Use the [Python Script](plotPoints.py) again to visualize the result.
    -
    -~~~bash
    -cd kmeans
    -python plotPoints.py result ./result clusters
    -~~~
    -
    -The following three pictures show the results for the sample input above. Play around with the parameters (number of iterations, number of clusters) to see how they affect the result.
    -
    -
    -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15|
    -|:--------------------:|:--------------------:|:--------------------:|
    -|<img src="{{ site.baseurl }}/page/img/quickstart-example/result003.png" alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/result008.png" alt="example2" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/result015.png" alt="example3" style="width: 275px;"/>|
    +If you click on your running job you will get a view where you can inspect individual operations
    +and, for example, see the number of processed elements:
    +
    +<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" alt="Example Job View"/></a>
     
    +This concludes our little tour of Flink. If you have any questions, please don't hesitate to ask on our [Mailing Lists](http://mail-archives.apache.org/mod_mbox/flink-user/).
    --- End diff --
    
    Let's link to the community mailing lists page so people can sign up easily?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55375307
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    --- End diff --
    
    Wikipedia capitalization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/1774


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55376365
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before running our program.
    +Build the project using Maven because we need the jar file for running on the cluster:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +
    +The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +
    +Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +
    +We also have to create the Kafka Topic, so that we our program can write to it:
    +
    +{% highlight bash %}
    +$ cd my/kafka/directory
    +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
    +{% endhighlight %}
    +
    +Now we are ready to run our jar file on the local Flink cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    +{% endhighlight %}
    +
    +The output of that command should look similar to this, if everything went according to plan:
    +
    +```
    +03/08/2016 15:09:27 Job execution switched to status RUNNING.
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    +```
    +
    +You can see how the individual operators start running. There are only two because
    --- End diff --
    
    comma before because?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371362
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    --- End diff --
    
    Shall we mention that the `print()`/`execute()` combination works differently in the batch case or would it make things complicated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371459
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    --- End diff --
    
    *builds


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55370936
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    --- End diff --
    
    I would move the explanation of why we need windows in streaming (and maybe a link to the windows docs) in the beginning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55375144
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    --- End diff --
    
    Maven capitalization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55376436
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before running our program.
    +Build the project using Maven because we need the jar file for running on the cluster:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +
    +The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +
    +Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +
    +We also have to create the Kafka Topic, so that we our program can write to it:
    +
    +{% highlight bash %}
    +$ cd my/kafka/directory
    +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results
    +{% endhighlight %}
    +
    +Now we are ready to run our jar file on the local Flink cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    +{% endhighlight %}
    +
    +The output of that command should look similar to this, if everything went according to plan:
    +
    +```
    +03/08/2016 15:09:27 Job execution switched to status RUNNING.
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    +```
    +
    +You can see how the individual operators start running. There are only two because
    +the operations after the window get folded into one operation for performance reasons. In Flink
    +we call this *chaining*.
    +
    +You can observe the output of the program by inspecting the Kafka topic using the Kafka
    +console consumer:
    +
    +{% highlight bash %}
    +bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result
    +{% endhighlight %}
    +
    +You can also check out the Flink dashboard which should be running at [http://localhost:8081](http://localhost:8081).
    +You get an overview of your cluster ressources and running jobs:
    --- End diff --
    
    typo: ressources


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55373645
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    --- End diff --
    
    Yes, that's a tricky one. I don't think we have an explanation of why windows are required anywhere in the doc. I also don't think however, that this guide is the right place, since I want this to be very concise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55369437
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    --- End diff --
    
    fo => go


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55372164
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before running our program.
    +Build the project using Maven because we need the jar file for running on the cluster:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +
    +The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +
    +Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +
    +We also have to create the Kafka Topic, so that we our program can write to it:
    --- End diff --
    
    remove "we"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371996
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution
    +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"`
    --- End diff --
    
    Full stop at "setup"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371080
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    --- End diff --
    
    *specifies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55375679
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    --- End diff --
    
    typo interested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55375868
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    --- End diff --
    
    What do you mean with "same" stream? Should we just say stream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371429
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    +We can now specify that we want to have windows imposed on this stream and compute some
    +result based on elements in these windows. We need to specify a window here because we are
    +dealing with an infinite stream of events. If you want to compute an aggregation on such an
    +infinite stream you never know when you are finished. That's where windows come into play,
    +they specify a time slice in which we should perform our computation. In our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte
    +difference of every edit in that time window for a user. The resulting Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
    +
    +The only thing left to do is print the stream to the console and start execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, such as creating
    +sources, transformations and sinks only build up a graph of internal operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    --- End diff --
    
    *command line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55376360
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give
    +import statements here since IDEs can add them automatically. At the end of this section I'll show
    +the complete code with import statements if you simply want to skip ahead and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a `StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution
    +parameters and create sources for reading from external systems. So let's go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For
    +the purposes of this example we are interrested in determining the number of added or removed
    +bytes that each user causes in a certain time window, let's say five seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is to say that operations
    +on this should take the key into account. In our case the summation of edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user.
    --- End diff --
    
    Yes, seems better



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1774#issuecomment-193828321
  
    Very very very nice! Thanks for doing this! I like it a lot.
    
    I was wondering whether we want to have a Scala version as well. What do you think? We can do it as a follow up if you don't have time for it now.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55375032
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    --- End diff --
    
    go from to?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55370299
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. 
    -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to
    +read this channel in Flink and count the number of bytes that each user edits within
    +a given window of time. This is easy enough to implement in a few minutes using Flink but it will
    +give you a good foundation from which to start building more complex analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project stucture. Please
    +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    --- End diff --
    
    src/main/java/...?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1774#issuecomment-193830427
  
    Thanks @uce as well! :smiley: 
    
    I incorporated all the fixes. The only thing I'm not happy with is the section with the window. I feel that I can't do it without some small explanation but it also can't be to big/in-depth.
    
    Scala I would do as a follow-up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3591] Replace Quickstart K-Means Exampl...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1774#issuecomment-193844948
  
    Alright, I merged it with the fixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---