You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/29 21:58:21 UTC
[1/3] flink git commit: [FLINK-3551] [examples] Sync Scala streaming
examples with Java examples.
Repository: flink
Updated Branches:
refs/heads/master e3bef5569 -> 55ab34ff3
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
new file mode 100644
index 0000000..0a26a35
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample
+import org.apache.flink.streaming.scala.examples.join.WindowJoin
+import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
+import org.apache.flink.streaming.scala.examples.ml.IncrementalLearningSkeleton
+import org.apache.flink.streaming.scala.examples.twitter.TwitterExample
+import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount
+import org.apache.flink.streaming.test.examples.join.WindowJoinData
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit.Test
+
+/**
+ * Integration test for streaming programs in Scala examples.
+ */
+class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testIterateExample(): Unit = {
+ val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
+ val resultPath = getTempDirPath("result")
+
+ // the example is inherently non-deterministic. The iteration timeout of 5000 ms
+ // is frequently not enough to make the test run stable on CI infrastructure
+ // with very small containers, so we cannot do a validation here
+ IterateExample.main(Array(
+ "--input", inputPath,
+ "--output", resultPath
+ ))
+ }
+
+ @Test
+ def testWindowJoin(): Unit = {
+ val resultPath = File.createTempFile("result-path", "dir").toURI.toString
+ try {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val grades = env
+ .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
+ .map( line => {
+ val fields = line.split(",")
+ Grade(fields(1), fields(2).toInt)
+ })
+
+ val salaries = env
+ .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
+ .map( line => {
+ val fields = line.split(",")
+ Salary(fields(1), fields(2).toInt)
+ })
+
+ WindowJoin.joinStreams(grades, salaries, 100)
+ .writeAsText(resultPath, WriteMode.OVERWRITE)
+
+ env.execute()
+
+ TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
+ }
+ finally try
+ FileUtils.deleteDirectory(new File(resultPath))
+
+ catch {
+ case _: Throwable =>
+ }
+ }
+
+ @Test
+ def testIncrementalLearningSkeleton(): Unit = {
+ val resultPath = getTempDirPath("result")
+ IncrementalLearningSkeleton.main(Array("--output", resultPath))
+ TestBaseUtils.compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath)
+ }
+
+ @Test
+ def testTwitterExample(): Unit = {
+ val resultPath = getTempDirPath("result")
+ TwitterExample.main(Array("--output", resultPath))
+ TestBaseUtils.compareResultsByLinesInMemory(
+ TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
+ resultPath)
+ }
+
+ @Test
+ def testSessionWindowing(): Unit = {
+ val resultPath = getTempDirPath("result")
+ SessionWindowing.main(Array("--output", resultPath))
+ TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
+ }
+
+ @Test
+ def testWindowWordCount(): Unit = {
+ val windowSize = "250"
+ val slideSize = "150"
+ val textPath = createTempFile("text.txt", WordCountData.TEXT)
+ val resultPath = getTempDirPath("result")
+
+ WindowWordCount.main(Array(
+ "--input", textPath,
+ "--output", resultPath,
+ "--window", windowSize,
+ "--slide", slideSize
+ ))
+
+ // since the parallel tokenizers might have different speed
+ // the exact output can not be checked just whether it is well-formed
+ // checks that the result lines look like e.g. (faust, 2)
+ TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)")
+ }
+
+ @Test
+ def testWordCount(): Unit = {
+ val textPath = createTempFile("text.txt", WordCountData.TEXT)
+ val resultPath = getTempDirPath("result")
+
+ WordCount.main(Array(
+ "--input", textPath,
+ "--output", resultPath
+ ))
+
+ TestBaseUtils.compareResultsByLinesInMemory(
+ WordCountData.STREAMING_COUNTS_AS_TUPLES,
+ resultPath)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
deleted file mode 100644
index 0e67be5..0000000
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples
-
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.scala.examples.join.WindowJoin
-import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
-import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-
-class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testProgram(): Unit = {
-
- val resultPath: String = File.createTempFile("result-path", "dir").toURI().toString()
- try {
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-
- val grades: DataStream[Grade] = env
- .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
- .map( line => {
- val fields = line.split(",")
- Grade(fields(1), fields(2).toInt)
- })
-
- val salaries: DataStream[Salary] = env
- .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
- .map( line => {
- val fields = line.split(",")
- Salary(fields(1), fields(2).toInt)
- })
-
- WindowJoin.joinStreams(grades, salaries, 100)
- .writeAsText(resultPath, WriteMode.OVERWRITE)
-
- env.execute()
-
- TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
- }
- finally {
- try {
- FileUtils.deleteDirectory(new File(resultPath))
- }
- catch {
- case _ : Throwable =>
- }
- }
- }
-
-}
[3/3] flink git commit: [FLINK-7025] [table] Port non-partitioned
unbounded proctime Over window to keyed state.
Posted by fh...@apache.org.
[FLINK-7025] [table] Port non-partitioned unbounded proctime Over window to keyed state.
This closes #4212.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55ab34ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55ab34ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55ab34ff
Branch: refs/heads/master
Commit: 55ab34ff3b92b2567bcab80f39826a9667f1c9ce
Parents: 1cf1620
Author: sunjincheng121 <su...@gmail.com>
Authored: Wed Jun 28 23:19:56 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 29 23:58:18 2017 +0200
----------------------------------------------------------------------
.../datastream/DataStreamOverAggregate.scala | 15 +--
.../table/runtime/aggregate/AggregateUtil.scala | 15 +--
.../ProcTimeUnboundedNonPartitionedOver.scala | 115 -------------------
.../aggregate/ProcTimeUnboundedOver.scala | 102 ++++++++++++++++
.../ProcTimeUnboundedPartitionedOver.scala | 102 ----------------
.../api/scala/stream/sql/OverWindowITCase.scala | 8 +-
.../runtime/harness/OverWindowHarnessTest.scala | 4 +-
7 files changed, 118 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 08f0356..c03dac6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -242,17 +242,10 @@ class DataStreamOverAggregate(
}
// non-partitioned aggregation
else {
- if (isRowTimeType) {
- inputDS.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(returnTypeInfo)
- .name(aggOpName)
- } else {
- inputDS
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(returnTypeInfo)
- .name(aggOpName)
- }
+ inputDS.keyBy(new NullByteKeySelector[CRow])
+ .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .returns(returnTypeInfo)
+ .name(aggOpName)
}
result
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 04c5070..f4ead48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -129,17 +129,10 @@ object AggregateUtil {
queryConfig)
}
} else {
- if (isPartitioned) {
- new ProcTimeUnboundedPartitionedOver(
- genFunction,
- aggregationStateType,
- queryConfig)
- } else {
- new ProcTimeUnboundedNonPartitionedOver(
- genFunction,
- aggregationStateType,
- queryConfig)
- }
+ new ProcTimeUnboundedOver(
+ genFunction,
+ aggregationStateType,
+ queryConfig)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
deleted file mode 100644
index f86bed2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.util.Collector
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for non-partitioned processing-time unbounded OVER window
- *
- * @param genAggregations Generated aggregate helper function
- * @param aggregationStateType row type info of aggregation
- */
-class ProcTimeUnboundedNonPartitionedOver(
- genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo,
- queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
- with CheckpointedFunction
- with Compiler[GeneratedAggregations] {
-
- private var accumulators: Row = _
- private var output: CRow = _
- private var state: ListState[Row] = _
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var function: GeneratedAggregations = _
-
- override def open(config: Configuration) {
- LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
- s"Code:\n${genAggregations.code}")
- val clazz = compile(
- getRuntimeContext.getUserCodeClassLoader,
- genAggregations.name,
- genAggregations.code)
- LOG.debug("Instantiating AggregateHelper.")
- function = clazz.newInstance()
-
- output = new CRow(function.createOutputRow(), true)
- if (null == accumulators) {
- val it = state.get().iterator()
- if (it.hasNext) {
- accumulators = it.next()
- } else {
- accumulators = function.createAccumulators()
- }
- }
- initCleanupTimeState("ProcTimeUnboundedNonPartitionedOverCleanupTime")
- }
-
- override def processElement(
- inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
- // register state-cleanup timer
- registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
-
- val input = inputC.row
-
- function.setForwardedFields(input, output.row)
-
- function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output.row)
-
- out.collect(output)
- }
-
- override def onTimer(
- timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
-
- if (needToCleanupState(timestamp)) {
- cleanupState(state)
- }
- }
-
- override def snapshotState(context: FunctionSnapshotContext): Unit = {
- state.clear()
- if (null != accumulators) {
- state.add(accumulators)
- }
- }
-
- override def initializeState(context: FunctionInitializationContext): Unit = {
- val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType)
- state = context.getOperatorStateStore.getListState(accumulatorsDescriptor)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
new file mode 100644
index 0000000..7a7b44d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
+import org.apache.flink.table.runtime.types.CRow
+import org.slf4j.LoggerFactory
+
+/**
+ * Process Function for processing-time unbounded OVER window
+ *
+ * @param genAggregations Generated aggregate helper function
+ * @param aggregationStateType row type info of aggregation
+ */
+class ProcTimeUnboundedOver(
+ genAggregations: GeneratedAggregationsFunction,
+ aggregationStateType: RowTypeInfo,
+ queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ with Compiler[GeneratedAggregations] {
+
+ private var output: CRow = _
+ private var state: ValueState[Row] = _
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
+
+ override def open(config: Configuration) {
+ LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
+ s"Code:\n${genAggregations.code}")
+ val clazz = compile(
+ getRuntimeContext.getUserCodeClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ output = new CRow(function.createOutputRow(), true)
+ val stateDescriptor: ValueStateDescriptor[Row] =
+ new ValueStateDescriptor[Row]("overState", aggregationStateType)
+ state = getRuntimeContext.getState(stateDescriptor)
+
+ initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
+ }
+
+ override def processElement(
+ inputC: CRow,
+ ctx: ProcessFunction[CRow, CRow]#Context,
+ out: Collector[CRow]): Unit = {
+
+ // register state-cleanup timer
+ registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
+
+ val input = inputC.row
+
+ var accumulators = state.value()
+
+ if (null == accumulators) {
+ accumulators = function.createAccumulators()
+ }
+
+ function.setForwardedFields(input, output.row)
+
+ function.accumulate(accumulators, input)
+ function.setAggregationResults(accumulators, output.row)
+
+ state.update(accumulators)
+ out.collect(output)
+ }
+
+ override def onTimer(
+ timestamp: Long,
+ ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ out: Collector[CRow]): Unit = {
+
+ if (needToCleanupState(timestamp)) {
+ cleanupState(state)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
deleted file mode 100644
index ad43d94..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
-import org.apache.flink.table.runtime.types.CRow
-import org.slf4j.LoggerFactory
-
-/**
- * Process Function for processing-time unbounded OVER window
- *
- * @param genAggregations Generated aggregate helper function
- * @param aggregationStateType row type info of aggregation
- */
-class ProcTimeUnboundedPartitionedOver(
- genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo,
- queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
- with Compiler[GeneratedAggregations] {
-
- private var output: CRow = _
- private var state: ValueState[Row] = _
- val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: GeneratedAggregations = _
-
- override def open(config: Configuration) {
- LOG.debug(s"Compiling AggregateHelper: ${genAggregations.name} \n\n" +
- s"Code:\n${genAggregations.code}")
- val clazz = compile(
- getRuntimeContext.getUserCodeClassLoader,
- genAggregations.name,
- genAggregations.code)
- LOG.debug("Instantiating AggregateHelper.")
- function = clazz.newInstance()
-
- output = new CRow(function.createOutputRow(), true)
- val stateDescriptor: ValueStateDescriptor[Row] =
- new ValueStateDescriptor[Row]("overState", aggregationStateType)
- state = getRuntimeContext.getState(stateDescriptor)
-
- initCleanupTimeState("ProcTimeUnboundedPartitionedOverCleanupTime")
- }
-
- override def processElement(
- inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
- out: Collector[CRow]): Unit = {
-
- // register state-cleanup timer
- registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime())
-
- val input = inputC.row
-
- var accumulators = state.value()
-
- if (null == accumulators) {
- accumulators = function.createAccumulators()
- }
-
- function.setForwardedFields(input, output.row)
-
- function.accumulate(accumulators, input)
- function.setAggregationResults(accumulators, output.row)
-
- state.update(accumulators)
- out.collect(output)
- }
-
- override def onTimer(
- timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
- out: Collector[CRow]): Unit = {
-
- if (needToCleanupState(timestamp)) {
- cleanupState(state)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 7a24f50..397e72c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.api.scala.stream.sql
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
@@ -217,6 +218,9 @@ class OverWindowITCase extends StreamingWithStateTestBase {
@Test
def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+ val queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -235,7 +239,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
"sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
"from T1"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
http://git-wip-us.apache.org/repos/asf/flink/blob/55ab34ff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 786a843..8cad64f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.runtime.harness
import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.{ConcurrentLinkedQueue}
+import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -276,7 +276,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
def testProcTimeUnboundedOver(): Unit = {
val processFunction = new KeyedProcessOperator[String, CRow, CRow](
- new ProcTimeUnboundedPartitionedOver(
+ new ProcTimeUnboundedOver(
genMinMaxAggFunction,
minMaxAggregationStateType,
queryConfig))
[2/3] flink git commit: [FLINK-3551] [examples] Sync Scala streaming
examples with Java examples.
Posted by fh...@apache.org.
[FLINK-3551] [examples] Sync Scala streaming examples with Java examples.
- Move Java example tests into a single ITCase
- Add ITCase for Scala examples
This closes #2761.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cf16207
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cf16207
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cf16207
Branch: refs/heads/master
Commit: 1cf162079c990e0504d736df768b0a8729d8faab
Parents: e3bef55
Author: Lim Chee Hau <ch...@gmail.com>
Authored: Sun Nov 6 00:41:59 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jun 29 23:58:17 2017 +0200
----------------------------------------------------------------------
.../ml/IncrementalLearningSkeleton.java | 2 +-
.../GroupedProcessingTimeWindowExample.java | 74 ++++----
.../examples/windowing/SessionWindowing.java | 7 +-
.../examples/wordcount/PojoExample.java | 160 ----------------
.../examples/iteration/IterateExample.scala | 135 ++++++++++++++
.../scala/examples/join/WindowJoin.scala | 6 +-
.../scala/examples/kafka/ReadFromKafka.scala | 73 ++++++++
.../scala/examples/kafka/WriteIntoKafka.scala | 84 +++++++++
.../ml/IncrementalLearningSkeleton.scala | 184 +++++++++++++++++++
.../examples/socket/SocketWindowWordCount.scala | 8 +-
.../scala/examples/twitter/TwitterExample.scala | 148 +++++++++++++++
.../GroupedProcessingTimeWindowExample.scala | 86 +++++++++
.../examples/windowing/SessionWindowing.scala | 92 ++++++++++
.../examples/windowing/WindowWordCount.scala | 100 ++++++++++
.../scala/examples/wordcount/WordCount.scala | 91 +++++++++
.../streaming/test/StreamingExamplesITCase.java | 154 ++++++++++++++++
.../iteration/IterateExampleITCase.java | 51 -----
.../test/examples/join/WindowJoinITCase.java | 84 ---------
.../ml/IncrementalLearningSkeletonITCase.java | 45 -----
.../examples/twitter/TwitterStreamITCase.java | 45 -----
.../windowing/SessionWindowingITCase.java | 45 -----
.../windowing/WindowWordCountITCase.java | 57 ------
.../examples/wordcount/PojoExampleITCase.java | 50 -----
.../examples/wordcount/WordCountITCase.java | 50 -----
.../examples/StreamingExamplesITCase.scala | 158 ++++++++++++++++
.../scala/examples/WindowJoinITCase.scala | 75 --------
26 files changed, 1354 insertions(+), 710 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 0063669..7908273 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -181,7 +181,7 @@ public class IncrementalLearningSkeleton {
* Creates newData using the model produced in batch-processing and the
* up-to-date partial model.
* <p>
- * By defaults emits the Integer 0 for every newData and the Integer 1
+ * By default emits the Integer 0 for every newData and the Integer 1
* for every model update.
* </p>
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index bedc130..1837314 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -34,9 +34,9 @@ import org.apache.flink.util.Collector;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
- * Example of grouped processing time windows.
+ * An example of grouped stream windowing into sliding time windows.
+ * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pairs.
*/
-@SuppressWarnings("serial")
public class GroupedProcessingTimeWindowExample {
public static void main(String[] args) throws Exception {
@@ -44,39 +44,7 @@ public class GroupedProcessingTimeWindowExample {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
- DataStream<Tuple2<Long, Long>> stream = env
- .addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() {
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
- final long startTime = System.currentTimeMillis();
-
- final long numElements = 20000000;
- final long numKeys = 10000;
- long val = 1L;
- long count = 0L;
-
- while (running && count < numElements) {
- count++;
- ctx.collect(new Tuple2<>(val++, 1L));
-
- if (val > numKeys) {
- val = 1L;
- }
- }
-
- final long endTime = System.currentTimeMillis();
- System.out.println("Took " + (endTime - startTime) + " msecs for " + numElements + " values");
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- });
+ DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
stream
.keyBy(0)
@@ -126,4 +94,40 @@ public class GroupedProcessingTimeWindowExample {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}
+
+ /**
+ * Parallel data source that serves a list of key-value pairs.
+ */
+ private static class DataSource extends RichParallelSourceFunction<Tuple2<Long, Long>> {
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+ final long startTime = System.currentTimeMillis();
+
+ final long numElements = 20000000;
+ final long numKeys = 10000;
+ long val = 1L;
+ long count = 0L;
+
+ while (running && count < numElements) {
+ count++;
+ ctx.collect(new Tuple2<>(val++, 1L));
+
+ if (val > numKeys) {
+ val = 1L;
+ }
+ }
+
+ final long endTime = System.currentTimeMillis();
+ System.out.println("Took " + (endTime - startTime) + " msecs for " + numElements + " values");
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index e4f4522..0c02d5b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -31,8 +31,8 @@ import java.util.ArrayList;
import java.util.List;
/**
- * An example of session windowing where events are keyed by ID and grouped
- * and counted within session windows with a timeout of 3 time units.
+ * An example of session windowing that keys events by ID and groups and counts them in
+ * session with gaps of 3 milliseconds.
*/
public class SessionWindowing {
@@ -70,9 +70,6 @@ public class SessionWindowing {
for (Tuple3<String, Long, Integer> value : input) {
ctx.collectWithTimestamp(value, value.f1);
ctx.emitWatermark(new Watermark(value.f1 - 1));
- if (!fileOutput) {
- System.out.println("Collected: " + value);
- }
}
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
deleted file mode 100644
index a8ece16..0000000
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows an implementation of WordCount without using the Tuple2
- * type, but a custom class.
- *
- * <p>Usage: <code>WordCount --input <path> --output <path></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link WordCountData}.
- *
- * <p>This example shows how to:
- * <ul>
- * <li>use POJO data types,
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- */
-public class PojoExample {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- // Checking input parameters
- final ParameterTool params = ParameterTool.fromArgs(args);
-
- // set up the execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // make parameters available in the web interface
- env.getConfig().setGlobalJobParameters(params);
-
- // get input data
- DataStream<String> text;
- if (params.has("input")) {
- System.out.println("Executing WordCountPojo example with default input data set.");
- System.out.println("Use --input to specify file input.");
- // read the text file from given input path
- text = env.readTextFile(params.get("input"));
- } else {
- // get default test text data
- text = env.fromElements(WordCountData.WORDS);
- }
-
- DataStream<Word> counts =
- // split up the lines into Word objects
- text.flatMap(new Tokenizer())
- // group by the field word and sum up the frequency
- .keyBy("word").sum("frequency");
-
- if (params.has("output")) {
- counts.writeAsText(params.get("output"));
- } else {
- System.out.println("Printing result to stdout. Use --output to specify output path.");
- counts.print();
- }
-
- // execute program
- env.execute("WordCount Pojo Example");
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- /**
- * This is the POJO (Plain Old Java Object) that is being used for all the
- * operations. As long as all fields are public or have a getter/setter, the
- * system can handle them
- */
- public static class Word {
-
- private String word;
- private Integer frequency;
-
- public Word() {
- }
-
- public Word(String word, int i) {
- this.word = word;
- this.frequency = i;
- }
-
- public String getWord() {
- return word;
- }
-
- public void setWord(String word) {
- this.word = word;
- }
-
- public Integer getFrequency() {
- return frequency;
- }
-
- public void setFrequency(Integer frequency) {
- this.frequency = frequency;
- }
-
- @Override
- public String toString() {
- return "(" + word + "," + frequency + ")";
- }
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * Implements the string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
- * Integer>}).
- */
- public static final class Tokenizer implements FlatMapFunction<String, Word> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Word> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Word(token, 1));
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
new file mode 100644
index 0000000..ecbf7c5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+
+/**
+ * Example illustrating iterations in Flink streaming.
+ *
+ * The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion.
+ *
+ * This example shows how to use:
+ *
+ * - streaming iterations,
+ * - buffer timeout to enhance latency,
+ * - directed outputs.
+ *
+ */
+object IterateExample {
+
+ private val Bound = 100
+
+ def main(args: Array[String]): Unit = {
+ // Checking input parameters
+ val params = ParameterTool.fromArgs(args)
+
+ // obtain execution environment and set setBufferTimeout to 1 to enable
+ // continuous flushing of the output buffers (lowest latency)
+ val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+ // make parameters available in the web interface
+ env.getConfig.setGlobalJobParameters(params)
+
+ // create input stream of integer pairs
+ val inputStream: DataStream[(Int, Int)] =
+ if (params.has("input")) {
+ // map a list of strings to integer pairs
+ env.readTextFile(params.get("input")).map { value: String =>
+ val record = value.substring(1, value.length - 1)
+ val splitted = record.split(",")
+ (Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
+ }
+ } else {
+ println("Executing Iterate example with default input data set.")
+ println("Use --input to specify file input.")
+ env.addSource(new RandomFibonacciSource)
+ }
+
+ def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < Bound
+
+ // create an iterative data stream from the input with 5 second timeout
+ val numbers: DataStream[((Int, Int), Int)] = inputStream
+ // Map the inputs so that the next Fibonacci numbers can be calculated
+ // while preserving the original input tuple
+ // A counter is attached to the tuple and incremented in every iteration step
+ .map(value => (value._1, value._2, value._1, value._2, 0))
+ .iterate(
+ (iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
+ // calculates the next Fibonacci number and increment the counter
+ val step = iteration.map(value =>
+ (value._1, value._2, value._4, value._3 + value._4, value._5 + 1))
+ // testing which tuple needs to be iterated again
+ val feedback = step.filter(value => withinBound(value._3, value._4))
+ // giving back the input pair and the counter
+ val output: DataStream[((Int, Int), Int)] = step
+ .filter(value => !withinBound(value._3, value._4))
+ .map(value => ((value._1, value._2), value._5))
+ (feedback, output)
+ }
+ // timeout after 5 seconds
+ , 5000L
+ )
+
+ if (params.has("output")) {
+ numbers.writeAsText(params.get("output"))
+ } else {
+ println("Printing result to stdout. Use --output to specify output path.")
+ numbers.print()
+ }
+
+ env.execute("Streaming Iteration Example")
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
+ */
+ private class RandomFibonacciSource extends SourceFunction[(Int, Int)] {
+
+ val rnd = new Random()
+ var counter = 0
+ @volatile var isRunning = true
+
+ override def run(ctx: SourceContext[(Int, Int)]): Unit = {
+
+ while (isRunning && counter < Bound) {
+ val first = rnd.nextInt(Bound / 2 - 1) + 1
+ val second = rnd.nextInt(Bound / 2 - 1) + 1
+
+ ctx.collect((first, second))
+ counter += 1
+ Thread.sleep(50L)
+ }
+ }
+
+ override def cancel(): Unit = isRunning = false
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 73be261..0e6fc37 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -78,10 +78,10 @@ object WindowJoin {
joined.print().setParallelism(1)
// execute program
- env.execute("WindowJoin")
+ env.execute("Windowed Join Example")
}
-
-
+
+
def joinStreams(
grades: DataStream[Grade],
salaries: DataStream[Salary],
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
new file mode 100644
index 0000000..3127ab7
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/ReadFromKafka.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+ * Read Strings from Kafka and print them to standard out.
+ * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file!
+ *
+ * Please pass the following arguments to run the example:
+ * {{{
+ * --topic test
+ * --bootstrap.servers localhost:9092
+ * --zookeeper.connect localhost:2181
+ * --group.id myconsumer
+ * }}}
+ */
+object ReadFromKafka {
+
+ def main(args: Array[String]): Unit = {
+
+ // parse input arguments
+ val params = ParameterTool.fromArgs(args)
+
+ if (params.getNumberOfParameters < 4) {
+ println("Missing parameters!\nUsage: Kafka --topic <topic> " +
+ "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>")
+ return
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.disableSysoutLogging
+ env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
+ // create a checkpoint every 5 seconds
+ env.enableCheckpointing(5000)
+ // make parameters available in the web interface
+ env.getConfig.setGlobalJobParameters(params)
+
+ // create a Kafka streaming source consumer for Kafka 0.8.x
+ val kafkaConsumer = new FlinkKafkaConsumer08(
+ params.getRequired("topic"),
+ new SimpleStringSchema,
+ params.getProperties)
+ val messageStream = env.addSource(kafkaConsumer)
+
+ // write kafka stream to standard out.
+ messageStream.print()
+
+ env.execute("Read from Kafka example")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
new file mode 100644
index 0000000..e34083a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/WriteIntoKafka.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.kafka
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema
+
+/**
+ * Generate a String every 500 ms and write it into a Kafka topic
+ *
+ * Please pass the following arguments to run the example:
+ * {{{
+ * --topic test
+ * --bootstrap.servers
+ * localhost:9092
+ * }}}
+ */
+object WriteIntoKafka {
+
+ def main(args: Array[String]): Unit = {
+
+ // parse input arguments
+ val params = ParameterTool.fromArgs(args)
+
+ if (params.getNumberOfParameters < 2) {
+ println("Missing parameters!")
+ println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>")
+ return
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.disableSysoutLogging
+ env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
+
+ // very simple data generator
+ val messageStream: DataStream[String] = env.addSource(new SourceFunction[String]() {
+ var running = true
+
+ override def run(ctx: SourceContext[String]): Unit = {
+ var i = 0L
+ while (this.running) {
+ ctx.collect(s"Element - ${i}")
+ i += 1
+ Thread.sleep(500)
+ }
+ }
+
+ override def cancel(): Unit = running = false
+ })
+
+ // create a Kafka producer for Kafka 0.8.x
+ val kafkaProducer = new FlinkKafkaProducer08(
+ params.getRequired("topic"),
+ new SimpleStringSchema,
+ params.getProperties)
+
+ // write data into Kafka
+ messageStream.addSink(kafkaProducer)
+
+ env.execute("Write into Kafka example")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
new file mode 100644
index 0000000..7b40cd5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ *
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ *
+ * This example shows how to use:
+ *
+ * - Connected streams
+ * - CoFunctions
+ * - Tuple data types
+ *
+ */
+object IncrementalLearningSkeleton {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]): Unit = {
+ // Checking input parameters
+ val params = ParameterTool.fromArgs(args)
+
+ // set up the execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ // build new model on every second of new data
+ val trainingData: DataStream[Int] = env.addSource(new FiniteTrainingDataSource)
+ val newData: DataStream[Int] = env.addSource(new FiniteNewDataSource)
+
+ val model: DataStream[Array[Double]] = trainingData
+ .assignTimestampsAndWatermarks(new LinearTimestamp)
+ .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+ .apply(new PartialModelBuilder)
+
+ // use partial model for newData
+ val prediction: DataStream[Int] = newData.connect(model).map(new Predictor)
+
+ // emit result
+ if (params.has("output")) {
+ prediction.writeAsText(params.get("output"))
+ } else {
+ println("Printing result to stdout. Use --output to specify output path.")
+ prediction.print()
+ }
+
+ // execute program
+ env.execute("Streaming Incremental Learning")
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Feeds new data for newData. By default it is implemented as constantly
+ * emitting the Integer 1 in a loop.
+ */
+ private class FiniteNewDataSource extends SourceFunction[Int] {
+ override def run(ctx: SourceContext[Int]) = {
+ Thread.sleep(15)
+ (0 until 50).foreach{ _ =>
+ Thread.sleep(5)
+ ctx.collect(1)
+ }
+ }
+
+ override def cancel() = {
+ // No cleanup needed
+ }
+ }
+
+ /**
+ * Feeds new training data for the partial model builder. By default it is
+ * implemented as constantly emitting the Integer 1 in a loop.
+ */
+ private class FiniteTrainingDataSource extends SourceFunction[Int] {
+ override def run(ctx: SourceContext[Int]) = (0 until 8200).foreach( _ => ctx.collect(1) )
+
+ override def cancel() = {
+ // No cleanup needed
+ }
+ }
+
+ private class LinearTimestamp extends AssignerWithPunctuatedWatermarks[Int] {
+ var counter = 0L
+
+ override def extractTimestamp(element: Int, previousElementTimestamp: Long): Long = {
+ counter += 10L
+ counter
+ }
+
+ override def checkAndGetNextWatermark(lastElement: Int, extractedTimestamp: Long) = {
+ new Watermark(counter - 1)
+ }
+ }
+
+ /**
+ * Builds up-to-date partial models on new training data.
+ */
+ private class PartialModelBuilder extends AllWindowFunction[Int, Array[Double], TimeWindow] {
+
+ protected def buildPartialModel(values: Iterable[Int]): Array[Double] = Array[Double](1)
+
+ override def apply(window: TimeWindow,
+ values: Iterable[Int],
+ out: Collector[Array[Double]]): Unit = {
+ out.collect(buildPartialModel(values))
+ }
+ }
+
+ /**
+ * Creates newData using the model produced in batch-processing and the
+ * up-to-date partial model.
+ *
+ * By default emits the Integer 0 for every newData and the Integer 1
+ * for every model update.
+ *
+ */
+ private class Predictor extends CoMapFunction[Int, Array[Double], Int] {
+
+ var batchModel: Array[Double] = null
+ var partialModel: Array[Double] = null
+
+ override def map1(value: Int): Int = {
+ // Return newData
+ predict(value)
+ }
+
+ override def map2(value: Array[Double]): Int = {
+ // Update model
+ partialModel = value
+ batchModel = getBatchModel()
+ 1
+ }
+
+ // pulls model built with batch-job on the old training data
+ protected def getBatchModel(): Array[Double] = Array[Double](0)
+
+ // performs newData using the two models
+ protected def predict(inTuple: Int): Int = 0
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index d2afa4d..e607f61 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -27,10 +27,10 @@ import org.apache.flink.streaming.api.windowing.time.Time
*
* This program connects to a server socket and reads strings from the socket.
* The easiest way to try this out is to open a text sever (at port 12345)
- * using the <i>netcat</i> tool via
- * <pre>
+ * using the ''netcat'' tool via
+ * {{{
* nc -l 12345
- * </pre>
+ * }}}
* and run this example with the hostname and the port as arguments..
*/
object SocketWindowWordCount {
@@ -61,7 +61,7 @@ object SocketWindowWordCount {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
- val text = env.socketTextStream(hostname, port, '\n')
+ val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
new file mode 100644
index 0000000..d916116
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.apache.flink.util.Collector
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON objects in a streaming fashion.
+ *
+ * The input is a Tweet stream from a TwitterSource.
+ *
+ * Usage:
+ * {{{
+ * TwitterExample [--output <path>]
+ * [--twitter-source.consumerKey <key>
+ * --twitter-source.consumerSecret <secret>
+ * --twitter-source.token <token>
+ * --twitter-source.tokenSecret <tokenSecret>]
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link TwitterExampleData}.
+ *
+ * This example shows how to:
+ *
+ * - acquire external data,
+ * - use in-line defined functions,
+ * - handle flattened stream inputs.
+ *
+ */
+object TwitterExample {
+
+ def main(args: Array[String]): Unit = {
+
+ // Checking input parameters
+ val params = ParameterTool.fromArgs(args)
+ println("Usage: TwitterExample [--output <path>] " +
+ "[--twitter-source.consumerKey <key> " +
+ "--twitter-source.consumerSecret <secret> " +
+ "--twitter-source.token <token> " +
+ "--twitter-source.tokenSecret <tokenSecret>]")
+
+ // set up the execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // make parameters available in the web interface
+ env.getConfig.setGlobalJobParameters(params)
+
+ env.setParallelism(params.getInt("parallelism", 1))
+
+ // get input data
+ val streamSource: DataStream[String] =
+ if (params.has(TwitterSource.CONSUMER_KEY) &&
+ params.has(TwitterSource.CONSUMER_SECRET) &&
+ params.has(TwitterSource.TOKEN) &&
+ params.has(TwitterSource.TOKEN_SECRET)
+ ) {
+ env.addSource(new TwitterSource(params.getProperties))
+ } else {
+ print("Executing TwitterStream example with default props.")
+ print("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> " +
+ "--twitter-source.token <token> " +
+ "--twitter-source.tokenSecret <tokenSecret> specify the authentication info."
+ )
+ // get default test text data
+ env.fromElements(TwitterExampleData.TEXTS: _*)
+ }
+
+ val tweets: DataStream[(String, Int)] = streamSource
+ // selecting English tweets and splitting to (word, 1)
+ .flatMap(new SelectEnglishAndTokenizeFlatMap)
+ // group by words and sum their occurrences
+ .keyBy(0).sum(1)
+
+ // emit result
+ if (params.has("output")) {
+ tweets.writeAsText(params.get("output"))
+ } else {
+ println("Printing result to stdout. Use --output to specify output path.")
+ tweets.print()
+ }
+
+ // execute program
+ env.execute("Twitter Streaming Example")
+ }
+
+ /**
+ * Deserialize JSON from twitter source
+ *
+ * Implements a string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" ({{{ Tuple2<String, Integer> }}}).
+ */
+ private class SelectEnglishAndTokenizeFlatMap extends FlatMapFunction[String, (String, Int)] {
+ lazy val jsonParser = new ObjectMapper()
+
+ override def flatMap(value: String, out: Collector[(String, Int)]): Unit = {
+ // deserialize JSON from twitter source
+ val jsonNode = jsonParser.readValue(value, classOf[JsonNode])
+ val isEnglish = jsonNode.has("user") &&
+ jsonNode.get("user").has("lang") &&
+ jsonNode.get("user").get("lang").asText == "en"
+ val hasText = jsonNode.has("text")
+
+ (isEnglish, hasText, jsonNode) match {
+ case (true, true, node) => {
+ val tokens = new ListBuffer[(String, Int)]()
+ val tokenizer = new StringTokenizer(node.get("text").asText())
+
+ while (tokenizer.hasMoreTokens) {
+ val token = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase()
+ if (token.nonEmpty)out.collect((token, 1))
+ }
+ }
+ case _ =>
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
new file mode 100644
index 0000000..090b3f5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * An example of grouped stream windowing into sliding time windows.
+ * This example uses [[RichParallelSourceFunction]] to generate a list of key-value pair.
+ */
+object GroupedProcessingTimeWindowExample {
+
+ def main(args: Array[String]): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(4)
+
+ val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
+
+ stream
+ .keyBy(0)
+ .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
+ .reduce((value1, value2) => (value1._1, value1._2 + value2._2))
+ .addSink(new SinkFunction[(Long, Long)]() {
+ override def invoke(in: (Long, Long)): Unit = {}
+ })
+
+ env.execute()
+ }
+
+ /**
+ * Parallel data source that serves a list of key-value pair.
+ */
+ private class DataSource extends RichParallelSourceFunction[(Long, Long)] {
+ @volatile private var running = true
+
+ override def run(ctx: SourceContext[(Long, Long)]): Unit = {
+ val startTime = System.currentTimeMillis()
+
+ val numElements = 20000000
+ val numKeys = 10000
+ var value = 1L
+ var count = 0L
+
+ while (running && count < numElements) {
+
+ ctx.collect((value, 1L))
+
+ count += 1
+ value += 1
+
+ if (value > numKeys) {
+ value = 1L
+ }
+ }
+
+ val endTime = System.currentTimeMillis()
+ println(s"Took ${endTime - startTime} msecs for ${numElements} values")
+ }
+
+ override def cancel(): Unit = running = false
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
new file mode 100644
index 0000000..3723674
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * An example of grouped stream windowing in session windows with session timeout of 3 msec.
+ * A source fetches elements with key, timestamp, and count.
+ */
+object SessionWindowing {
+
+ def main(args: Array[String]): Unit = {
+
+ val params = ParameterTool.fromArgs(args)
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ env.getConfig.setGlobalJobParameters(params)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val fileOutput = params.has("output")
+
+ val input = List(
+ ("a", 1L, 1),
+ ("b", 1L, 1),
+ ("b", 3L, 1),
+ ("b", 5L, 1),
+ ("c", 6L, 1),
+ // We expect to detect the session "a" earlier than this point (the old
+ // functionality can only detect here when the next starts)
+ ("a", 10L, 1),
+ // We expect to detect session "b" and "c" at this point as well
+ ("c", 11L, 1)
+ )
+
+ val source: DataStream[(String, Long, Int)] = env.addSource(
+ new SourceFunction[(String, Long, Int)]() {
+
+ override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
+ input.foreach(value => {
+ ctx.collectWithTimestamp(value, value._2)
+ ctx.emitWatermark(new Watermark(value._2 - 1))
+ })
+ ctx.emitWatermark(new Watermark(Long.MaxValue))
+ }
+
+ override def cancel(): Unit = {}
+
+ })
+
+ // We create sessions for each id with max timeout of 3 time units
+ val aggregated: DataStream[(String, Long, Int)] = source
+ .keyBy(0)
+ .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
+ .sum(2)
+
+ if (fileOutput) {
+ aggregated.writeAsText(params.get("output"))
+ } else {
+ print("Printing result to stdout. Use --output to specify output path.")
+ aggregated.print()
+ }
+
+ env.execute()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
new file mode 100644
index 0000000..349f253
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+
+/**
+ * Implements a windowed version of the streaming "WordCount" program.
+ *
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * Usage:
+ * {{{
+ * WordCount
+ * --input <path>
+ * --output <path>
+ * --window <n>
+ * --slide <n>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[WordCountData]].
+ *
+ * This example shows how to:
+ *
+ * - write a simple Flink Streaming program,
+ * - use tuple data types,
+ * - use basic windowing abstractions.
+ *
+ */
+object WindowWordCount {
+
+ def main(args: Array[String]): Unit = {
+
+ val params = ParameterTool.fromArgs(args)
+
+ // set up the execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // get input data
+ val text =
+ if (params.has("input")) {
+ // read the text file from given input path
+ env.readTextFile(params.get("input"))
+ } else {
+ println("Executing WindowWordCount example with default input data set.")
+ println("Use --input to specify file input.")
+ // get default test text data
+ env.fromElements(WordCountData.WORDS: _*)
+ }
+
+ // make parameters available in the web interface
+ env.getConfig.setGlobalJobParameters(params)
+
+ val windowSize = params.getInt("window", 250)
+ val slideSize = params.getInt("slide", 150)
+
+ val counts: DataStream[(String, Int)] = text
+ // split up the lines in pairs (2-tuple) containing: (word,1)
+ .flatMap(_.toLowerCase.split("\\W+"))
+ .filter(_.nonEmpty)
+ .map((_, 1))
+ .keyBy(0)
+ // create windows of windowSize records slided every slideSize records
+ .countWindow(windowSize, slideSize)
+ // group by the tuple field "0" and sum up tuple field "1"
+ .sum(1)
+
+ // emit result
+ if (params.has("output")) {
+ counts.writeAsText(params.get("output"))
+ } else {
+ println("Printing result to stdout. Use --output to specify output path.")
+ counts.print()
+ }
+
+ // execute program
+ env.execute("WindowWordCount")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
new file mode 100644
index 0000000..74b726f
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ *
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * Usage:
+ * {{{
+ * WordCount --input <path> --output <path>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ *
+ * This example shows how to:
+ *
+ * - write a simple Flink Streaming program,
+ * - use tuple data types,
+ * - write and use transformation functions.
+ *
+ */
+object WordCount {
+
+ def main(args: Array[String]) {
+
+ // Checking input parameters
+ val params = ParameterTool.fromArgs(args)
+
+ // set up the execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // make parameters available in the web interface
+ env.getConfig.setGlobalJobParameters(params)
+
+ // get input data
+ val text =
+ // read the text file from given input path
+ if (params.has("input")) {
+ env.readTextFile(params.get("input"))
+ } else {
+ println("Executing WordCount example with default inputs data set.")
+ println("Use --input to specify file input.")
+ // get default test text data
+ env.fromElements(WordCountData.WORDS: _*)
+ }
+
+ val counts: DataStream[(String, Int)] = text
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ .flatMap(_.toLowerCase.split("\\W+"))
+ .filter(_.nonEmpty)
+ .map((_, 1))
+ // group by the tuple field "0" and sum up tuple field "1"
+ .keyBy(0)
+ .sum(1)
+
+ // emit result
+ if (params.has("output")) {
+ counts.writeAsText(params.get("output"))
+ } else {
+ println("Printing result to stdout. Use --output to specify output path.")
+ counts.print()
+ }
+
+ // execute program
+ env.execute("Streaming WordCount")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
new file mode 100644
index 0000000..4c47d59
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
+import org.apache.flink.streaming.test.examples.join.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import java.io.File;
+
+/**
+ * Integration test for streaming programs in Java examples.
+ */
+public class StreamingExamplesITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ public void testIterateExample() throws Exception {
+ final String inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
+ final String resultPath = getTempDirPath("result");
+
+ // the example is inherently non-deterministic. The iteration timeout of 5000 ms
+ // is frequently not enough to make the test run stable on CI infrastructure
+ // with very small containers, so we cannot do a validation here
+ org.apache.flink.streaming.examples.iteration.IterateExample.main(new String[]{
+ "--input", inputPath,
+ "--output", resultPath});
+ }
+
+ @Test
+ public void testWindowJoin() throws Exception {
+
+ final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
+
+ final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public Tuple2<String, Integer> map(String value) throws Exception {
+ String[] fields = value.split(",");
+ return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
+ }
+ }
+
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> grades = env
+ .fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
+ .map(new Parser());
+
+ DataStream<Tuple2<String, Integer>> salaries = env
+ .fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
+ .map(new Parser());
+
+ org.apache.flink.streaming.examples.join.WindowJoin
+ .runWindowJoin(grades, salaries, 100)
+ .writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ // since the two sides of the join might have different speed
+ // the exact output can not be checked just whether it is well-formed
+ // checks that the result lines look like e.g. (bob, 2, 2015)
+ checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(resultPath));
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ @Test
+ public void testIncrementalLearningSkeleton() throws Exception {
+ final String resultPath = getTempDirPath("result");
+ org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton.main(new String[]{"--output", resultPath});
+ compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
+ }
+
+ @Test
+ public void testTwitterStream() throws Exception {
+ final String resultPath = getTempDirPath("result");
+ org.apache.flink.streaming.examples.twitter.TwitterExample.main(new String[]{"--output", resultPath});
+ compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ }
+
+ @Test
+ public void testSessionWindowing() throws Exception {
+ final String resultPath = getTempDirPath("result");
+ org.apache.flink.streaming.examples.windowing.SessionWindowing.main(new String[]{"--output", resultPath});
+ compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
+ }
+
+ @Test
+ public void testWindowWordCount() throws Exception {
+ final String windowSize = "250";
+ final String slideSize = "150";
+ final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+ final String resultPath = getTempDirPath("result");
+
+ org.apache.flink.streaming.examples.windowing.WindowWordCount.main(new String[]{
+ "--input", textPath,
+ "--output", resultPath,
+ "--window", windowSize,
+ "--slide", slideSize});
+
+ // since the parallel tokenizers might have different speed
+ // the exact output can not be checked just whether it is well-formed
+ // checks that the result lines look like e.g. (faust, 2)
+ checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+ }
+
+ @Test
+ public void testWordCount() throws Exception {
+ final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+ final String resultPath = getTempDirPath("result");
+
+ org.apache.flink.streaming.examples.wordcount.WordCount.main(new String[]{
+ "--input", textPath,
+ "--output", resultPath});
+
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
deleted file mode 100644
index e6ff1d4..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/iteration/IterateExampleITCase.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.iteration;
-
-import org.apache.flink.streaming.examples.iteration.IterateExample;
-import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link IterateExample}.
- */
-public class IterateExampleITCase extends StreamingProgramTestBase {
-
- protected String inputPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- // the example is inherently non-deterministic. The iteration timeout of 5000 ms
- // is frequently not enough to make the test run stable on CI infrastructure
- // with very small containers, so we cannot do a validation here
- }
-
- @Override
- protected void testProgram() throws Exception {
- IterateExample.main(new String[]{
- "--input", inputPath,
- "--output", resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
deleted file mode 100644
index 525ff6f..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/join/WindowJoinITCase.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.join;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.join.WindowJoin;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Test;
-
-import java.io.File;
-
-/**
- * Tests for {@link WindowJoin}.
- */
-@SuppressWarnings("serial")
-public class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void testProgram() throws Exception {
- final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
- DataStream<Tuple2<String, Integer>> grades = env
- .fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
- .map(new Parser());
-
- DataStream<Tuple2<String, Integer>> salaries = env
- .fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
- .map(new Parser());
-
- WindowJoin
- .runWindowJoin(grades, salaries, 100)
- .writeAsText(resultPath, WriteMode.OVERWRITE);
-
- env.execute();
-
- // since the two sides of the join might have different speed
- // the exact output can not be checked just whether it is well-formed
- // checks that the result lines look like e.g. (bob, 2, 2015)
- checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
- }
- finally {
- try {
- FileUtils.deleteDirectory(new File(resultPath));
- } catch (Throwable ignored) {}
- }
- }
-
- //-------------------------------------------------------------------------
-
- private static final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public Tuple2<String, Integer> map(String value) throws Exception {
- String[] fields = value.split(",");
- return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
deleted file mode 100644
index d5b160d..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/ml/IncrementalLearningSkeletonITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.ml;
-
-import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton;
-import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link IncrementalLearningSkeleton}.
- */
-public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- IncrementalLearningSkeleton.main(new String[]{"--output", resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
deleted file mode 100644
index 7f3b440..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/twitter/TwitterStreamITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.twitter;
-
-import org.apache.flink.streaming.examples.twitter.TwitterExample;
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link TwitterExample}.
- */
-public class TwitterStreamITCase extends StreamingProgramTestBase {
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- TwitterExample.main(new String[]{"--output", resultPath});
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
deleted file mode 100644
index 768ed11..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/SessionWindowingITCase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.windowing;
-
-import org.apache.flink.streaming.examples.windowing.SessionWindowing;
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-/**
- * Tests for {@link SessionWindowing}.
- */
-public class SessionWindowingITCase extends StreamingProgramTestBase {
-
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- SessionWindowing.main(new String[]{"--output", resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
deleted file mode 100644
index 0025d94..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/WindowWordCountITCase.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.windowing;
-
-import org.apache.flink.streaming.examples.windowing.WindowWordCount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link WindowWordCount}.
- */
-public class WindowWordCountITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
- protected String windowSize = "250";
- protected String slideSize = "150";
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- // since the parallel tokenizers might have different speed
- // the exact output can not be checked just whether it is well-formed
- // checks that the result lines look like e.g. (faust, 2)
- checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
- }
-
- @Override
- protected void testProgram() throws Exception {
- WindowWordCount.main(new String[]{
- "--input", textPath,
- "--output", resultPath,
- "--window", windowSize,
- "--slide", slideSize});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
deleted file mode 100644
index 76adb0d..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/PojoExampleITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.wordcount;
-
-import org.apache.flink.streaming.examples.wordcount.PojoExample;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link PojoExample}.
- */
-public class PojoExampleITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- PojoExample.main(new String[]{
- "--input", textPath,
- "--output", resultPath});
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1cf16207/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
deleted file mode 100644
index 96dfafb..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/wordcount/WordCountITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.examples.wordcount;
-
-import org.apache.flink.streaming.examples.wordcount.WordCount;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.apache.flink.test.testdata.WordCountData;
-
-/**
- * Tests for {@link WordCount}.
- */
-public class WordCountITCase extends StreamingProgramTestBase {
-
- protected String textPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- textPath = createTempFile("text.txt", WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- WordCount.main(new String[]{
- "--input", textPath,
- "--output", resultPath});
- }
-}