You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/29 21:58:21 UTC

[1/3] flink git commit: [FLINK-3551] [examples] Sync Scala streaming examples with Java examples.

Repository: flink
Updated Branches:
  refs/heads/master e3bef5569 -> 55ab34ff3


http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
new file mode 100644
index 0000000..0a26a35
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample
+import org.apache.flink.streaming.scala.examples.join.WindowJoin
+import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
+import org.apache.flink.streaming.scala.examples.ml.IncrementalLearningSkeleton
+import org.apache.flink.streaming.scala.examples.twitter.TwitterExample
+import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount
+import org.apache.flink.streaming.test.examples.join.WindowJoinData
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit.Test
+
+/**
+ * Integration test for streaming programs in Scala examples.
+ */
+class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testIterateExample(): Unit = {
+    val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+    val resultPath = getTempDirPath("result")
+
+    // the example is inherently non-deterministic. The iteration timeout of 5000 ms
+    // is frequently not enough to make the test run stable on CI infrastructure
+    // with very small containers, so we cannot do a validation here
+    IterateExample.main(Array(
+      "--input", inputPath,
+      "--output", resultPath
+    ))
+  }
+
+  @Test
+  def testWindowJoin(): Unit = {
+    val resultPath = File.createTempFile("result-path", "dir").toURI.toString
+    try {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+      val grades = env
+        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Grade(fields(1), fields(2).toInt)
+        })
+
+      val salaries = env
+        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Salary(fields(1), fields(2).toInt)
+        })
+
+      WindowJoin.joinStreams(grades, salaries, 100)
+        .writeAsText(resultPath, WriteMode.OVERWRITE)
+
+      env.execute()
+
+      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
+    }
+    finally try
+      FileUtils.deleteDirectory(new File(resultPath))
+
+    catch {
+      case _: Throwable =>
+    }
+  }
+
+  @Test
+  def testIncrementalLearningSkeleton(): Unit = {
+    val resultPath = getTempDirPath("result")
+    IncrementalLearningSkeleton.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath)
+  }
+
+  @Test
+  def testTwitterExample(): Unit = {
+    val resultPath = getTempDirPath("result")
+    TwitterExample.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(
+      TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
+      resultPath)
+  }
+
+  @Test
+  def testSessionWindowing(): Unit = {
+    val resultPath = getTempDirPath("result")
+    SessionWindowing.main(Array("--output", resultPath))
+    TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
+  }
+
+  @Test
+  def testWindowWordCount(): Unit = {
+    val windowSize = "250"
+    val slideSize = "150"
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
+
+    WindowWordCount.main(Array(
+      "--input", textPath,
+      "--output", resultPath,
+      "--window", windowSize,
+      "--slide", slideSize
+    ))
+
+    // since the parallel tokenizers might have different speed
+    // the exact output can not be checked just whether it is well-formed
+    // checks that the result lines look like e.g. (faust, 2)
+    TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)")
+  }
+
+  @Test
+  def testWordCount(): Unit = {
+    val textPath = createTempFile("text.txt", WordCountData.TEXT)
+    val resultPath = getTempDirPath("result")
+
+    WordCount.main(Array(
+      "--input", textPath,
+      "--output", resultPath
+    ))
+
+    TestBaseUtils.compareResultsByLinesInMemory(
+      WordCountData.STREAMING_COUNTS_AS_TUPLES,
+      resultPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
deleted file mode 100644
index 0e67be5..0000000
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.scala.examples.join.WindowJoin
-import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
-import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-
-class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
-  
-  @Test
-  def testProgram(): Unit = {
-    
-    val resultPath: String = File.createTempFile("result-path", "dir").toURI().toString()
-    try {
-      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-      
-      val grades: DataStream[Grade] = env
-        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
-        .map( line => {
-          val fields = line.split(",")
-          Grade(fields(1), fields(2).toInt)
-        })
-
-      val salaries: DataStream[Salary] = env
-        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
-        .map( line => {
-          val fields = line.split(",")
-          Salary(fields(1), fields(2).toInt)
-        })
-      
-      WindowJoin.joinStreams(grades, salaries, 100)
-        .writeAsText(resultPath, WriteMode.OVERWRITE)
-      
-      env.execute()
-
-      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
-    }
-    finally {
-      try {
-        FileUtils.deleteDirectory(new File(resultPath))
-      }
-      catch {
-        case _ : Throwable => 
-      }
-    }
-  }
-
-}


[3/3] flink git commit: [FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.

Posted by fh...@apache.org.
[FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.

This closes #4212.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55ab34ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55ab34ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55ab34ff

Branch: refs/heads/master
Commit: 55ab34ff3b92b2567bcab80f39826a9667f1c9ce
Parents: 1cf1620
Author: sunjincheng121 <su...@gmail.com>
Authored: Wed Jun 28 23:19:56 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 29 23:58:18 2017 +0200

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    |  15 +--
 .../table/runtime/aggregate/AggregateUtil.scala |  15 +--
 .../ProcTimeUnboundedNonPartitionedOver.scala   | 115 -------------------
 .../aggregate/ProcTimeUnboundedOver.scala       | 102 ++++++++++++++++
 .../ProcTimeUnboundedPartitionedOver.scala      | 102 ----------------
 .../api/scala/stream/sql/OverWindowITCase.scala |   8 +-
 .../runtime/harness/OverWindowHarnessTest.scala |   4 +-
 7 files changed, 118 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 08f0356..c03dac6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -242,17 +242,10 @@ class DataStreamOverAggregate(
       }
       // non-partitioned aggregation
       else {
-        if (isRowTimeType) {
-          inputDS.keyBy(new NullByteKeySelector[CRow])
-            .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(returnTypeInfo)
-            .name(aggOpName)
-        } else {
-          inputDS
-            .process(processFunction).setParallelism(1).setMaxParallelism(1)
-            .returns(returnTypeInfo)
-            .name(aggOpName)
-        }
+        inputDS.keyBy(new NullByteKeySelector[CRow])
+          .process(processFunction).setParallelism(1).setMaxParallelism(1)
+          .returns(returnTypeInfo)
+          .name(aggOpName)
       }
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 04c5070..f4ead48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -129,17 +129,10 @@ object AggregateUtil {
           queryConfig)
       }
     } else {
-      if (isPartitioned) {
-        new ProcTimeUnboundedPartitionedOver(
-          genFunction,
-          aggregationStateType,
-          queryConfig)
-      } else {
-        new ProcTimeUnboundedNonPartitionedOver(
-          genFunction,
-          aggregationStateType,
-          queryConfig)
-      }
+      new ProcTimeUnboundedOver(
+        genFunction,
+        aggregationStateType,
+        queryConfig)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
deleted file mode 100644
index f86bed2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for non-partitioned processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class ProcTimeUnboundedNonPartitionedOver(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with CheckpointedFunction
-    with Compiler[GeneratedAggregations] {
-
-  private var accumulators: Row = _
-  private var output: CRow = _
-  private var state: ListState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
-                s"Code:\n${genAggregations.code}")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = new CRow(function.createOutputRow(), true)
-    if (null == accumulators) {
-      val it = state.get().iterator()
-      if (it.hasNext) {
-        accumulators = it.next()
-      } else {
-        accumulators = function.createAccumulators()
-      }
-    }
-    initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
-  }
-
-  override def processElement(
-      inputC: CRow,
-      ctx: ProcessFunction[CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-    // register state-cleanup timer
-    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-
-    val input = inputC.row
-
-    function.setForwardedFields(input, output.row)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output.row)
-
-    out.collect(output)
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
-    out: Collector[CRow]): Unit = {
-
-    if (needToCleanupState(timestamp)) {
-      cleanupState(state)
-    }
-  }
-
-  override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    state.clear()
-    if (null != accumulators) {
-      state.add(accumulators)
-    }
-  }
-
-  override def initializeState(context: FunctionInitializationContext): Unit = {
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
-    state = context.getOperatorStateStore.getListState(accumulatorsDescriptor)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
new file mode 100644
index 0000000..7a7b44d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
+import org.slf4j.LoggerFactory
+
+/**
+  * Process Function for processing-time unbounded OVER window
+  *
+  * @param genAggregations Generated aggregate helper function
+  * @param aggregationStateType     row type info of aggregation
+  */
+class ProcTimeUnboundedOver(
+    genAggregations: GeneratedAggregationsFunction,
+    aggregationStateType: RowTypeInfo,
+    queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+    with Compiler[GeneratedAggregations] {
+
+  private var output: CRow = _
+  private var state: ValueState[Row] = _
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
+
+  override def open(config: Configuration) {
+    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
+                s"Code:\n${genAggregations.code}")
+    val clazz = compile(
+      getRuntimeContext.getUserCodeClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
+
+    output = new CRow(function.createOutputRow(), true)
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("overState", aggregationStateType)
+    state = getRuntimeContext.getState(stateDescriptor)
+
+    initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
+  }
+
+  override def processElement(
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    // register state-cleanup timer
+    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
+    val input = inputC.row
+
+    var accumulators = state.value()
+
+    if (null == accumulators) {
+      accumulators = function.createAccumulators()
+    }
+
+    function.setForwardedFields(input, output.row)
+
+    function.accumulate(accumulators, input)
+    function.setAggregationResults(accumulators, output.row)
+
+    state.update(accumulators)
+    out.collect(output)
+  }
+
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+
+    if (needToCleanupState(timestamp)) {
+      cleanupState(state)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
deleted file mode 100644
index ad43d94..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.CRow
-import org.slf4j.LoggerFactory
-
-/**
-  * Process Function for processing-time unbounded OVER window
-  *
-  * @param genAggregations Generated aggregate helper function
-  * @param aggregationStateType     row type info of aggregation
-  */
-class ProcTimeUnboundedPartitionedOver(
-    genAggregations: GeneratedAggregationsFunction,
-    aggregationStateType: RowTypeInfo,
-    queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
-    with Compiler[GeneratedAggregations] {
-
-  private var output: CRow = _
-  private var state: ValueState[Row] = _
-  val LOG = LoggerFactory.getLogger(this.getClass)
-  private var function: GeneratedAggregations = _
-
-  override def open(config: Configuration) {
-    LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
-                s"Code:\n${genAggregations.code}")
-    val clazz = compile(
-      getRuntimeContext.getUserCodeClassLoader,
-      genAggregations.name,
-      genAggregations.code)
-    LOG.debug("Instantiating AggregateHelper.")
-    function = clazz.newInstance()
-
-    output = new CRow(function.createOutputRow(), true)
-    val stateDescriptor: ValueStateDescriptor[Row] =
-      new ValueStateDescriptor[Row]("overState", aggregationStateType)
-    state = getRuntimeContext.getState(stateDescriptor)
-
-    initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
-  }
-
-  override def processElement(
-    inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
-    out: Collector[CRow]): Unit = {
-
-    // register state-cleanup timer
-    registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-    val input = inputC.row
-
-    var accumulators = state.value()
-
-    if (null == accumulators) {
-      accumulators = function.createAccumulators()
-    }
-
-    function.setForwardedFields(input, output.row)
-
-    function.accumulate(accumulators, input)
-    function.setAggregationResults(accumulators, output.row)
-
-    state.update(accumulators)
-    out.collect(output)
-  }
-
-  override def onTimer(
-    timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
-    out: Collector[CRow]): Unit = {
-
-    if (needToCleanupState(timestamp)) {
-      cleanupState(state)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7a24f50..397e72c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.api.scala.stream.sql
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
@@ -217,6 +218,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
   @Test
   def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+    val queryConfig =
+      new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -235,7 +239,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 786a843..8cad64f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.runtime.harness
 
 import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.{ConcurrentLinkedQueue}
+import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -276,7 +276,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
   def testProcTimeUnboundedOver(): Unit = {
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeUnboundedPartitionedOver(
+      new ProcTimeUnboundedOver(
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         queryConfig))


[2/3] flink git commit: [FLINK-3551] [examples] Sync Scala streaming examples with Java examples.

Posted by fh...@apache.org.
[FLINK-3551] [examples] Sync Scala streaming examples with Java examples.

- Move Java example tests into a single ITCase
- Add ITCase for Scala examples

This closes #2761.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cf16207
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cf16207
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cf16207

Branch: refs/heads/master
Commit: 1cf162079c990e0504d736df768b0a8729d8faab
Parents: e3bef55
Author: Lim Chee Hau <ch...@gmail.com>
Authored: Sun Nov 6 00:41:59 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 29 23:58:17 2017 +0200

----------------------------------------------------------------------
 .../ml/IncrementalLearningSkeleton.java         |   2 +-
 .../GroupedProcessingTimeWindowExample.java     |  74 ++++----
 .../examples/windowing/SessionWindowing.java    |   7 +-
 .../examples/wordcount/PojoExample.java         | 160 ----------------
 .../examples/iteration/IterateExample.scala     | 135 ++++++++++++++
 .../scala/examples/join/WindowJoin.scala        |   6 +-
 .../scala/examples/kafka/ReadFromKafka.scala    |  73 ++++++++
 .../scala/examples/kafka/WriteIntoKafka.scala   |  84 +++++++++
 .../ml/IncrementalLearningSkeleton.scala        | 184 +++++++++++++++++++
 .../examples/socket/SocketWindowWordCount.scala |   8 +-
 .../scala/examples/twitter/TwitterExample.scala | 148 +++++++++++++++
 .../GroupedProcessingTimeWindowExample.scala    |  86 +++++++++
 .../examples/windowing/SessionWindowing.scala   |  92 ++++++++++
 .../examples/windowing/WindowWordCount.scala    | 100 ++++++++++
 .../scala/examples/wordcount/WordCount.scala    |  91 +++++++++
 .../streaming/test/StreamingExamplesITCase.java | 154 ++++++++++++++++
 .../iteration/IterateExampleITCase.java         |  51 -----
 .../test/examples/join/WindowJoinITCase.java    |  84 ---------
 .../ml/IncrementalLearningSkeletonITCase.java   |  45 -----
 .../examples/twitter/TwitterStreamITCase.java   |  45 -----
 .../windowing/SessionWindowingITCase.java       |  45 -----
 .../windowing/WindowWordCountITCase.java        |  57 ------
 .../examples/wordcount/PojoExampleITCase.java   |  50 -----
 .../examples/wordcount/WordCountITCase.java     |  50 -----
 .../examples/StreamingExamplesITCase.scala      | 158 ++++++++++++++++
 .../scala/examples/WindowJoinITCase.scala       |  75 --------
 26 files changed, 1354 insertions(+), 710 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 0063669..7908273 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -181,7 +181,7 @@ public class IncrementalLearningSkeleton {
 	 * Creates newData using the model produced in batch-processing and the
 	 * up-to-date partial model.
 	 * <p>
-	 * By defaults emits the Integer 0 for every newData and the Integer 1
+	 * By default emits the Integer 0 for every newData and the Integer 1
 	 * for every model update.
 	 * </p>
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index bedc130..1837314 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -34,9 +34,9 @@ import org.apache.flink.util.Collector;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
- * Example of grouped processing time windows.
+ * An example of grouped stream windowing into sliding time windows.
+ * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pairs.
  */
-@SuppressWarnings("serial")
 public class GroupedProcessingTimeWindowExample {
 
 	public static void main(String[] args) throws Exception {
@@ -44,39 +44,7 @@ public class GroupedProcessingTimeWindowExample {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(4);
 
-		DataStream<Tuple2<Long, Long>> stream = env
-				.addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
-						final long startTime = System.currentTimeMillis();
-
-						final long numElements = 20000000;
-						final long numKeys = 10000;
-						long val = 1L;
-						long count = 0L;
-
-						while (running && count < numElements) {
-							count++;
-							ctx.collect(new Tuple2<>(val++, 1L));
-
-							if (val > numKeys) {
-								val = 1L;
-							}
-						}
-
-						final long endTime = System.currentTimeMillis();
-						System.out.println("Took " + (endTime - startTime) + " msecs for " + numElements + " values");
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
+		DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
 
 		stream
 			.keyBy(0)
@@ -126,4 +94,40 @@ public class GroupedProcessingTimeWindowExample {
 			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	}
+
+	/**
+	 * Parallel data source that serves a list of key-value pairs.
+	 */
+	private static class DataSource extends RichParallelSourceFunction<Tuple2<Long, Long>> {
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+			final long startTime = System.currentTimeMillis();
+
+			final long numElements = 20000000;
+			final long numKeys = 10000;
+			long val = 1L;
+			long count = 0L;
+
+			while (running && count < numElements) {
+				count++;
+				ctx.collect(new Tuple2<>(val++, 1L));
+
+				if (val > numKeys) {
+					val = 1L;
+				}
+			}
+
+			final long endTime = System.currentTimeMillis();
+			System.out.println("Took " + (endTime - startTime) + " msecs for " + numElements + " values");
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index e4f4522..0c02d5b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -31,8 +31,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * An example of session windowing where events are keyed by ID and grouped
- * and counted within session windows with a timeout of 3 time units.
+ * An example of session windowing that keys events by ID and groups and counts them in
+ * session with gaps of 3 milliseconds.
  */
 public class SessionWindowing {
 
@@ -70,9 +70,6 @@ public class SessionWindowing {
 						for (Tuple3<String, Long, Integer> value : input) {
 							ctx.collectWithTimestamp(value, value.f1);
 							ctx.emitWatermark(new Watermark(value.f1 - 1));
-							if (!fileOutput) {
-								System.out.println("Collected: " + value);
-							}
 						}
 						ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
deleted file mode 100644
index a8ece16..0000000
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of WordCount without using the Tuple2
- * type, but a custom class.
- *
- * <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>use POJO data types,
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- */
-public class PojoExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		// Checking input parameters
-		final ParameterTool params = ParameterTool.fromArgs(args);
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(params);
-
-		// get input data
-		DataStream<String> text;
-		if (params.has("input")) {
-			System.out.println("Executing WordCountPojo example with default input data set.");
-			System.out.println("Use --input to specify file input.");
-			// read the text file from given input path
-			text = env.readTextFile(params.get("input"));
-		} else {
-			// get default test text data
-			text = env.fromElements(WordCountData.WORDS);
-		}
-
-		DataStream<Word> counts =
-		// split up the lines into Word objects
-		text.flatMap(new Tokenizer())
-		// group by the field word and sum up the frequency
-				.keyBy("word").sum("frequency");
-
-		if (params.has("output")) {
-			counts.writeAsText(params.get("output"));
-		} else {
-			System.out.println("Printing result to stdout. Use --output to specify output path.");
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount Pojo Example");
-	}
-
-	// *************************************************************************
-	// DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * This is the POJO (Plain Old Java Object) that is being used for all the
-	 * operations. As long as all fields are public or have a getter/setter, the
-	 * system can handle them
-	 */
-	public static class Word {
-
-		private String word;
-		private Integer frequency;
-
-		public Word() {
-		}
-
-		public Word(String word, int i) {
-			this.word = word;
-			this.frequency = i;
-		}
-
-		public String getWord() {
-			return word;
-		}
-
-		public void setWord(String word) {
-			this.word = word;
-		}
-
-		public Integer getFrequency() {
-			return frequency;
-		}
-
-		public void setFrequency(Integer frequency) {
-			this.frequency = frequency;
-		}
-
-		@Override
-		public String toString() {
-			return "(" + word + "," + frequency + ")";
-		}
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
-	 * Integer>}).
-	 */
-	public static final class Tokenizer implements FlatMapFunction<String, Word> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Word> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Word(token, 1));
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
new file mode 100644
index 0000000..ecbf7c5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * Example illustrating iterations in Flink streaming.
+ *
+ * The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion.
+ *
+ * This example shows how to use:
+ *
+ *  - streaming iterations,
+ *  - buffer timeout to enhance latency,
+ *  - directed outputs.
+ *
+ */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+    // Checking input parameters
+    val params = ParameterTool.fromArgs(args)
+
+    // obtain execution environment and set setBufferTimeout to 1 to enable
+    // continuous flushing of the output buffers (lowest latency)
+    val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    // create input stream of integer pairs
+    val inputStream: DataStream[(Int, Int)] =
+    if (params.has("input")) {
+      // map a list of strings to integer pairs
+      env.readTextFile(params.get("input")).map { value: String =>
+        val record = value.substring(1, value.length - 1)
+        val splitted = record.split(",")
+        (Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
+      }
+    } else {
+      println("Executing Iterate example with default input data set.")
+      println("Use --input to specify file input.")
+      env.addSource(new RandomFibonacciSource)
+    }
+
+    def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < Bound
+
+    // create an iterative data stream from the input with 5 second timeout
+    val numbers: DataStream[((Int, Int), Int)] = inputStream
+      // Map the inputs so that the next Fibonacci numbers can be calculated
+      // while preserving the original input tuple
+      // A counter is attached to the tuple and incremented in every iteration step
+      .map(value => (value._1, value._2, value._1, value._2, 0))
+      .iterate(
+        (iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
+          // calculates the next Fibonacci number and increment the counter
+          val step = iteration.map(value =>
+            (value._1, value._2, value._4, value._3 + value._4, value._5 + 1))
+          // testing which tuple needs to be iterated again
+          val feedback = step.filter(value => withinBound(value._3, value._4))
+          // giving back the input pair and the counter
+          val output: DataStream[((Int, Int), Int)] = step
+            .filter(value => !withinBound(value._3, value._4))
+            .map(value => ((value._1, value._2), value._5))
+          (feedback, output)
+        }
+        // timeout after 5 seconds
+        , 5000L
+      )
+
+    if (params.has("output")) {
+      numbers.writeAsText(params.get("output"))
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      numbers.print()
+    }
+
+    env.execute("Streaming Iteration Example")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
+   */
+  private class RandomFibonacciSource extends SourceFunction[(Int, Int)] {
+
+    val rnd = new Random()
+    var counter = 0
+    @volatile var isRunning = true
+
+    override def run(ctx: SourceContext[(Int, Int)]): Unit = {
+
+      while (isRunning && counter < Bound) {
+        val first = rnd.nextInt(Bound / 2 - 1) + 1
+        val second = rnd.nextInt(Bound / 2 - 1) + 1
+
+        ctx.collect((first, second))
+        counter += 1
+        Thread.sleep(50L)
+      }
+    }
+
+    override def cancel(): Unit = isRunning = false
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 73be261..0e6fc37 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -78,10 +78,10 @@ object WindowJoin {
     joined.print().setParallelism(1)
 
     // execute program
-    env.execute("WindowJoin")
+    env.execute("Windowed Join Example")
   }
-  
-  
+
+
   def joinStreams(
       grades: DataStream[Grade],
       salaries: DataStream[Salary],

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
new file mode 100644
index 0000000..3127ab7
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+ * Read Strings from Kafka and print them to standard out.
+ * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file!
+ *
+ * Please pass the following arguments to run the example:
+ * {{{
+ * --topic test
+ * --bootstrap.servers localhost:9092
+ * --zookeeper.connect localhost:2181
+ * --group.id myconsumer
+ * }}}
+ */
+object ReadFromKafka {
+
+  def main(args: Array[String]): Unit = {
+
+    // parse input arguments
+    val params = ParameterTool.fromArgs(args)
+
+    if (params.getNumberOfParameters < 4) {
+      println("Missing parameters!\nUsage: Kafka --topic <topic> " +
+        "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>")
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.disableSysoutLogging
+    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
+    // create a checkpoint every 5 seconds
+    env.enableCheckpointing(5000)
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    // create a Kafka streaming source consumer for Kafka 0.8.x
+    val kafkaConsumer = new FlinkKafkaConsumer08(
+      params.getRequired("topic"),
+      new SimpleStringSchema,
+      params.getProperties)
+    val messageStream = env.addSource(kafkaConsumer)
+
+    // write kafka stream to standard out.
+    messageStream.print()
+
+    env.execute("Read from Kafka example")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
new file mode 100644
index 0000000..e34083a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+ * Generate a String every 500 ms and write it into a Kafka topic
+ *
+ * Please pass the following arguments to run the example:
+ * {{{
+ * --topic test
+ * --bootstrap.servers
+ * localhost:9092
+ * }}}
+ */
+object WriteIntoKafka {
+
+  def main(args: Array[String]): Unit = {
+
+    // parse input arguments
+    val params = ParameterTool.fromArgs(args)
+
+    if (params.getNumberOfParameters < 2) {
+      println("Missing parameters!")
+      println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>")
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.disableSysoutLogging
+    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
+
+    // very simple data generator
+    val messageStream: DataStream[String] = env.addSource(new SourceFunction[String]() {
+      var running = true
+
+      override def run(ctx: SourceContext[String]): Unit = {
+        var i = 0L
+        while (this.running) {
+          ctx.collect(s"Element - ${i}")
+          i += 1
+          Thread.sleep(500)
+        }
+      }
+
+      override def cancel(): Unit = running = false
+    })
+
+    // create a Kafka producer for Kafka 0.8.x
+    val kafkaProducer = new FlinkKafkaProducer08(
+      params.getRequired("topic"),
+      new SimpleStringSchema,
+      params.getProperties)
+
+    // write data into Kafka
+    messageStream.addSink(kafkaProducer)
+
+    env.execute("Write into Kafka example")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
new file mode 100644
index 0000000..7b40cd5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ *
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ *
+ * This example shows how to use:
+ *
+ *  - Connected streams
+ *  - CoFunctions
+ *  - Tuple data types
+ *
+ */
+object IncrementalLearningSkeleton {
+
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+    // Checking input parameters
+    val params = ParameterTool.fromArgs(args)
+
+    // set up the execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    // build new model on every second of new data
+    val trainingData: DataStream[Int] = env.addSource(new FiniteTrainingDataSource)
+    val newData: DataStream[Int] = env.addSource(new FiniteNewDataSource)
+
+    val model: DataStream[Array[Double]] = trainingData
+      .assignTimestampsAndWatermarks(new LinearTimestamp)
+      .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+      .apply(new PartialModelBuilder)
+
+    // use partial model for newData
+    val prediction: DataStream[Int] = newData.connect(model).map(new Predictor)
+
+    // emit result
+    if (params.has("output")) {
+      prediction.writeAsText(params.get("output"))
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      prediction.print()
+    }
+
+    // execute program
+    env.execute("Streaming Incremental Learning")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  /**
+   * Feeds new data for newData. By default it is implemented as constantly
+   * emitting the Integer 1 in a loop.
+   */
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+    override def run(ctx: SourceContext[Int]) = {
+      Thread.sleep(15)
+      (0 until 50).foreach{ _ =>
+        Thread.sleep(5)
+        ctx.collect(1)
+      }
+    }
+
+    override def cancel() = {
+      // No cleanup needed
+    }
+  }
+
+  /**
+   * Feeds new training data for the partial model builder. By default it is
+   * implemented as constantly emitting the Integer 1 in a loop.
+   */
+  private class FiniteTrainingDataSource extends SourceFunction[Int] {
+    override def run(ctx: SourceContext[Int]) = (0 until 8200).foreach( _ => ctx.collect(1) )
+
+    override def cancel() = {
+      // No cleanup needed
+    }
+  }
+
+  private class LinearTimestamp extends AssignerWithPunctuatedWatermarks[Int] {
+    var counter = 0L
+
+    override def extractTimestamp(element: Int, previousElementTimestamp: Long): Long = {
+      counter += 10L
+      counter
+    }
+
+    override def checkAndGetNextWatermark(lastElement: Int, extractedTimestamp: Long) = {
+      new Watermark(counter - 1)
+    }
+  }
+
+  /**
+   * Builds up-to-date partial models on new training data.
+   */
+  private class PartialModelBuilder extends AllWindowFunction[Int, Array[Double], TimeWindow] {
+
+    protected def buildPartialModel(values: Iterable[Int]): Array[Double] = Array[Double](1)
+
+    override def apply(window: TimeWindow,
+                       values: Iterable[Int],
+                       out: Collector[Array[Double]]): Unit = {
+      out.collect(buildPartialModel(values))
+    }
+  }
+
+  /**
+   * Creates newData using the model produced in batch-processing and the
+   * up-to-date partial model.
+   *
+   * By default emits the Integer 0 for every newData and the Integer 1
+   * for every model update.
+   *
+   */
+  private class Predictor extends CoMapFunction[Int, Array[Double], Int] {
+
+    var batchModel: Array[Double] = null
+    var partialModel: Array[Double] = null
+
+    override def map1(value: Int): Int = {
+      // Return newData
+      predict(value)
+    }
+
+    override def map2(value: Array[Double]): Int = {
+      // Update model
+      partialModel = value
+      batchModel = getBatchModel()
+      1
+    }
+
+    // pulls model built with batch-job on the old training data
+    protected def getBatchModel(): Array[Double] = Array[Double](0)
+
+    // performs newData using the two models
+    protected def predict(inTuple: Int): Int = 0
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index d2afa4d..e607f61 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -27,10 +27,10 @@ import org.apache.flink.streaming.api.windowing.time.Time
  * 
  * This program connects to a server socket and reads strings from the socket.
  * The easiest way to try this out is to open a text sever (at port 12345) 
- * using the <i>netcat</i> tool via
- * <pre>
+ * using the ''netcat'' tool via
+ * {{{
  * nc -l 12345
- * </pre>
+ * }}}
  * and run this example with the hostname and the port as arguments..
  */
 object SocketWindowWordCount {
@@ -61,7 +61,7 @@ object SocketWindowWordCount {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     
     // get input data by connecting to the socket
-    val text = env.socketTextStream(hostname, port, '\n')
+    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
 
     // parse the data, group it, window it, and aggregate the counts 
     val windowCounts = text

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
new file mode 100644
index 0000000..d916116
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.apache.flink.util.Collector
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON objects in a streaming fashion.
+ *
+ * The input is a Tweet stream from a TwitterSource.
+ *
+ * Usage:
+ * {{{
+ * TwitterExample [--output <path>]
+ * [--twitter-source.consumerKey <key>
+ * --twitter-source.consumerSecret <secret>
+ * --twitter-source.token <token>
+ * --twitter-source.tokenSecret <tokenSecret>]
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link TwitterExampleData}.
+ *
+ * This example shows how to:
+ *
+ *  - acquire external data,
+ *  - use in-line defined functions,
+ *  - handle flattened stream inputs.
+ *
+ */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // Checking input parameters
+    val params = ParameterTool.fromArgs(args)
+    println("Usage: TwitterExample [--output <path>] " +
+      "[--twitter-source.consumerKey <key> " +
+      "--twitter-source.consumerSecret <secret> " +
+      "--twitter-source.token <token> " +
+      "--twitter-source.tokenSecret <tokenSecret>]")
+
+    // set up the execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    env.setParallelism(params.getInt("parallelism", 1))
+
+    // get input data
+    val streamSource: DataStream[String] =
+    if (params.has(TwitterSource.CONSUMER_KEY) &&
+      params.has(TwitterSource.CONSUMER_SECRET) &&
+      params.has(TwitterSource.TOKEN) &&
+      params.has(TwitterSource.TOKEN_SECRET)
+    ) {
+      env.addSource(new TwitterSource(params.getProperties))
+    } else {
+      print("Executing TwitterStream example with default props.")
+      print("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> " +
+        "--twitter-source.token <token> " +
+        "--twitter-source.tokenSecret <tokenSecret> specify the authentication info."
+      )
+      // get default test text data
+      env.fromElements(TwitterExampleData.TEXTS: _*)
+    }
+
+    val tweets: DataStream[(String, Int)] = streamSource
+      // selecting English tweets and splitting to (word, 1)
+      .flatMap(new SelectEnglishAndTokenizeFlatMap)
+      // group by words and sum their occurrences
+      .keyBy(0).sum(1)
+
+    // emit result
+    if (params.has("output")) {
+      tweets.writeAsText(params.get("output"))
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      tweets.print()
+    }
+
+    // execute program
+    env.execute("Twitter Streaming Example")
+  }
+
+  /**
+   * Deserialize JSON from twitter source
+   *
+   * Implements a string tokenizer that splits sentences into words as a
+   * user-defined FlatMapFunction. The function takes a line (String) and
+   * splits it into multiple pairs in the form of "(word,1)" ({{{ Tuple2<String, Integer> }}}).
+   */
+  private class SelectEnglishAndTokenizeFlatMap extends FlatMapFunction[String, (String, Int)] {
+    lazy val jsonParser = new ObjectMapper()
+
+    override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
+      // deserialize JSON from twitter source
+      val jsonNode = jsonParser.readValue(value, classOf[JsonNode])
+      val isEnglish = jsonNode.has("user") &&
+        jsonNode.get("user").has("lang") &&
+        jsonNode.get("user").get("lang").asText == "en"
+      val hasText = jsonNode.has("text")
+
+      (isEnglish, hasText, jsonNode) match {
+        case (true, true, node) => {
+          val tokens = new ListBuffer[(String, Int)]()
+          val tokenizer = new StringTokenizer(node.get("text").asText())
+
+          while (tokenizer.hasMoreTokens) {
+            val token = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase()
+            if (token.nonEmpty)out.collect((token, 1))
+          }
+        }
+        case _ =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
new file mode 100644
index 0000000..090b3f5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * An example of grouped stream windowing into sliding time windows.
+ * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair.
+ */
+object GroupedProcessingTimeWindowExample {
+
+  def main(args: Array[String]): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(4)
+
+    val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
+
+    stream
+      .keyBy(0)
+      .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+      .reduce((value1, value2) => (value1._1, value1._2 + value2._2))
+      .addSink(new SinkFunction[(Long, Long)]() {
+        override def invoke(in: (Long, Long)): Unit = {}
+      })
+
+    env.execute()
+  }
+
+  /**
+   * Parallel data source that serves a list of key-value pair.
+   */
+  private class DataSource extends RichParallelSourceFunction[(Long, Long)] {
+    @volatile private var running = true
+
+    override def run(ctx: SourceContext[(Long, Long)]): Unit = {
+      val startTime = System.currentTimeMillis()
+
+      val numElements = 20000000
+      val numKeys = 10000
+      var value = 1L
+      var count = 0L
+
+      while (running && count < numElements) {
+
+        ctx.collect((value, 1L))
+
+        count += 1
+        value += 1
+
+        if (value > numKeys) {
+          value = 1L
+        }
+      }
+
+      val endTime = System.currentTimeMillis()
+      println(s"Took ${endTime - startTime} msecs for ${numElements} values")
+    }
+
+    override def cancel(): Unit = running = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
new file mode 100644
index 0000000..3723674
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * An example of grouped stream windowing in session windows with session timeout of 3 msec.
+ * A source fetches elements with key, timestamp, and count.
+ */
+object SessionWindowing {
+
+  def main(args: Array[String]): Unit = {
+
+    val params = ParameterTool.fromArgs(args)
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    env.getConfig.setGlobalJobParameters(params)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val fileOutput = params.has("output")
+
+    val input = List(
+      ("a", 1L, 1),
+      ("b", 1L, 1),
+      ("b", 3L, 1),
+      ("b", 5L, 1),
+      ("c", 6L, 1),
+      // We expect to detect the session "a" earlier than this point (the old
+      // functionality can only detect here when the next starts)
+      ("a", 10L, 1),
+      // We expect to detect session "b" and "c" at this point as well
+      ("c", 11L, 1)
+    )
+
+    val source: DataStream[(String, Long, Int)] = env.addSource(
+      new SourceFunction[(String, Long, Int)]() {
+
+        override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
+          input.foreach(value => {
+            ctx.collectWithTimestamp(value, value._2)
+            ctx.emitWatermark(new Watermark(value._2 - 1))
+          })
+          ctx.emitWatermark(new Watermark(Long.MaxValue))
+        }
+
+        override def cancel(): Unit = {}
+
+      })
+
+    // We create sessions for each id with max timeout of 3 time units
+    val aggregated: DataStream[(String, Long, Int)] = source
+      .keyBy(0)
+      .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
+      .sum(2)
+
+    if (fileOutput) {
+      aggregated.writeAsText(params.get("output"))
+    } else {
+      print("Printing result to stdout. Use --output to specify output path.")
+      aggregated.print()
+    }
+
+    env.execute()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
new file mode 100644
index 0000000..349f253
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+
+/**
+ * Implements a windowed version of the streaming "WordCount" program.
+ *
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * Usage:
+ * {{{
+ * WordCount
+ * --input <path>
+ * --output <path>
+ * --window <n>
+ * --slide <n>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[WordCountData]].
+ *
+ * This example shows how to:
+ *
+ *  - write a simple Flink Streaming program,
+ *  - use tuple data types,
+ *  - use basic windowing abstractions.
+ *
+ */
+object WindowWordCount {
+
+  def main(args: Array[String]): Unit = {
+
+    val params = ParameterTool.fromArgs(args)
+
+    // set up the execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // get input data
+    val text =
+    if (params.has("input")) {
+      // read the text file from given input path
+      env.readTextFile(params.get("input"))
+    } else {
+      println("Executing WindowWordCount example with default input data set.")
+      println("Use --input to specify file input.")
+      // get default test text data
+      env.fromElements(WordCountData.WORDS: _*)
+    }
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    val windowSize = params.getInt("window", 250)
+    val slideSize = params.getInt("slide", 150)
+
+    val counts: DataStream[(String, Int)] = text
+      // split up the lines in pairs (2-tuple) containing: (word,1)
+      .flatMap(_.toLowerCase.split("\\W+"))
+      .filter(_.nonEmpty)
+      .map((_, 1))
+      .keyBy(0)
+      // create windows of windowSize records slided every slideSize records
+      .countWindow(windowSize, slideSize)
+      // group by the tuple field "0" and sum up tuple field "1"
+      .sum(1)
+
+    // emit result
+    if (params.has("output")) {
+      counts.writeAsText(params.get("output"))
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      counts.print()
+    }
+
+    // execute program
+    env.execute("WindowWordCount")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
new file mode 100644
index 0000000..74b726f
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ *
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * Usage:
+ * {{{
+ * WordCount --input <path> --output <path>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ *
+ * This example shows how to:
+ *
+ *  - write a simple Flink Streaming program,
+ *  - use tuple data types,
+ *  - write and use transformation functions.
+ *
+ */
+object WordCount {
+
+  def main(args: Array[String]) {
+
+    // Checking input parameters
+    val params = ParameterTool.fromArgs(args)
+
+    // set up the execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // make parameters available in the web interface
+    env.getConfig.setGlobalJobParameters(params)
+
+    // get input data
+    val text =
+    // read the text file from given input path
+    if (params.has("input")) {
+      env.readTextFile(params.get("input"))
+    } else {
+      println("Executing WordCount example with default inputs data set.")
+      println("Use --input to specify file input.")
+      // get default test text data
+      env.fromElements(WordCountData.WORDS: _*)
+    }
+
+    val counts: DataStream[(String, Int)] = text
+      // split up the lines in pairs (2-tuples) containing: (word,1)
+      .flatMap(_.toLowerCase.split("\\W+"))
+      .filter(_.nonEmpty)
+      .map((_, 1))
+      // group by the tuple field "0" and sum up tuple field "1"
+      .keyBy(0)
+      .sum(1)
+
+    // emit result
+    if (params.has("output")) {
+      counts.writeAsText(params.get("output"))
+    } else {
+      println("Printing result to stdout. Use --output to specify output path.")
+      counts.print()
+    }
+
+    // execute program
+    env.execute("Streaming WordCount")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
new file mode 100644
index 0000000..4c47d59
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
+import org.apache.flink.streaming.test.examples.join.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ * Integration test for streaming programs in Java examples.
+ */
+public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testIterateExample() throws Exception {
+		final String inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
+		final String resultPath = getTempDirPath("result");
+
+		// the example is inherently non-deterministic. The iteration timeout of 5000 ms
+		// is frequently not enough to make the test run stable on CI infrastructure
+		// with very small containers, so we cannot do a validation here
+		org.apache.flink.streaming.examples.iteration.IterateExample.main(new String[]{
+			"--input", inputPath,
+			"--output", resultPath});
+	}
+
+	@Test
+	public void testWindowJoin() throws Exception {
+
+		final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
+
+		final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
+
+			@Override
+			public Tuple2<String, Integer> map(String value) throws Exception {
+				String[] fields = value.split(",");
+				return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
+			}
+		}
+
+		try {
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+			DataStream<Tuple2<String, Integer>> grades = env
+				.fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
+				.map(new Parser());
+
+			DataStream<Tuple2<String, Integer>> salaries = env
+				.fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
+				.map(new Parser());
+
+			org.apache.flink.streaming.examples.join.WindowJoin
+				.runWindowJoin(grades, salaries, 100)
+				.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+			env.execute();
+
+			// since the two sides of the join might have different speed
+			// the exact output can not be checked just whether it is well-formed
+			// checks that the result lines look like e.g. (bob, 2, 2015)
+			checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+		} finally {
+			try {
+				FileUtils.deleteDirectory(new File(resultPath));
+			} catch (Throwable ignored) {
+			}
+		}
+	}
+
+	@Test
+	public void testIncrementalLearningSkeleton() throws Exception {
+		final String resultPath = getTempDirPath("result");
+		org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton.main(new String[]{"--output", resultPath});
+		compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
+	}
+
+	@Test
+	public void testTwitterStream() throws Exception {
+		final String resultPath = getTempDirPath("result");
+		org.apache.flink.streaming.examples.twitter.TwitterExample.main(new String[]{"--output", resultPath});
+		compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Test
+	public void testSessionWindowing() throws Exception {
+		final String resultPath = getTempDirPath("result");
+		org.apache.flink.streaming.examples.windowing.SessionWindowing.main(new String[]{"--output", resultPath});
+		compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
+	}
+
+	@Test
+	public void testWindowWordCount() throws Exception {
+		final String windowSize = "250";
+		final String slideSize = "150";
+		final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+		final String resultPath = getTempDirPath("result");
+
+		org.apache.flink.streaming.examples.windowing.WindowWordCount.main(new String[]{
+			"--input", textPath,
+			"--output", resultPath,
+			"--window", windowSize,
+			"--slide", slideSize});
+
+		// since the parallel tokenizers might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (faust, 2)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+	}
+
+	@Test
+	public void testWordCount() throws Exception {
+		final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+		final String resultPath = getTempDirPath("result");
+
+		org.apache.flink.streaming.examples.wordcount.WordCount.main(new String[]{
+			"--input", textPath,
+			"--output", resultPath});
+
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
deleted file mode 100644
index e6ff1d4..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.iteration;
-
-import org.apache.flink.streaming.examples.iteration.IterateExample;
-import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link IterateExample}.
- */
-public class IterateExampleITCase extends StreamingProgramTestBase {
-
-	protected String inputPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// the example is inherently non-deterministic. The iteration timeout of 5000 ms
-		// is frequently not enough to make the test run stable on CI infrastructure
-		// with very small containers, so we cannot do a validation here
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		IterateExample.main(new String[]{
-				"--input", inputPath,
-				"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
deleted file mode 100644
index 525ff6f..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.join;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.join.WindowJoin;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Test;
-
-import java.io.File;
-
-/**
- * Tests for {@link WindowJoin}.
- */
-@SuppressWarnings("serial")
-public class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
-
-	@Test
-	public void testProgram() throws Exception {
-		final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
-		try {
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-			DataStream<Tuple2<String, Integer>> grades = env
-					.fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
-					.map(new Parser());
-
-			DataStream<Tuple2<String, Integer>> salaries = env
-					.fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
-					.map(new Parser());
-
-			WindowJoin
-					.runWindowJoin(grades, salaries, 100)
-					.writeAsText(resultPath, WriteMode.OVERWRITE);
-
-			env.execute();
-
-			// since the two sides of the join might have different speed
-			// the exact output can not be checked just whether it is well-formed
-			// checks that the result lines look like e.g. (bob, 2, 2015)
-			checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
-		}
-		finally {
-			try {
-				FileUtils.deleteDirectory(new File(resultPath));
-			} catch (Throwable ignored) {}
-		}
-	}
-
-	//-------------------------------------------------------------------------
-
-	private static final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public Tuple2<String, Integer> map(String value) throws Exception {
-			String[] fields = value.split(",");
-			return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
deleted file mode 100644
index d5b160d..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.ml;
-
-import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton;
-import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link IncrementalLearningSkeleton}.
- */
-public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		IncrementalLearningSkeleton.main(new String[]{"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
deleted file mode 100644
index 7f3b440..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.twitter;
-
-import org.apache.flink.streaming.examples.twitter.TwitterExample;
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link TwitterExample}.
- */
-public class TwitterStreamITCase extends StreamingProgramTestBase {
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		TwitterExample.main(new String[]{"--output", resultPath});
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
deleted file mode 100644
index 768ed11..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.windowing;
-
-import org.apache.flink.streaming.examples.windowing.SessionWindowing;
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link SessionWindowing}.
- */
-public class SessionWindowingITCase extends StreamingProgramTestBase {
-
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		SessionWindowing.main(new String[]{"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
deleted file mode 100644
index 0025d94..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.windowing;
-
-import org.apache.flink.streaming.examples.windowing.WindowWordCount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link WindowWordCount}.
- */
-public class WindowWordCountITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-	protected String windowSize = "250";
-	protected String slideSize = "150";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// since the parallel tokenizers might have different speed
-		// the exact output can not be checked just whether it is well-formed
-		// checks that the result lines look like e.g. (faust, 2)
-		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		WindowWordCount.main(new String[]{
-				"--input", textPath,
-				"--output", resultPath,
-				"--window", windowSize,
-				"--slide", slideSize});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
deleted file mode 100644
index 76adb0d..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.wordcount;
-
-import org.apache.flink.streaming.examples.wordcount.PojoExample;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link PojoExample}.
- */
-public class PojoExampleITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		PojoExample.main(new String[]{
-				"--input", textPath,
-				"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
deleted file mode 100644
index 96dfafb..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.wordcount;
-
-import org.apache.flink.streaming.examples.wordcount.WordCount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link WordCount}.
- */
-public class WordCountITCase extends StreamingProgramTestBase {
-
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[]{
-				"--input", textPath,
-				"--output", resultPath});
-	}
-}