You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:29 UTC
[13/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
deleted file mode 100644
index 988e7ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.lang
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.co.CoMapFunction
-import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.runtime.partitioner._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert.fail
-import org.junit.Test
-
-class DataStreamTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testNaming(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source1Operator = env.generateSequence(0, 0).name("testSource1")
- val source1 = source1Operator
- assert("testSource1" == source1Operator.getName)
-
- val dataStream1 = source1
- .map(x => 0L)
- .name("testMap")
- assert("testMap" == dataStream1.getName)
-
- val dataStream2 = env.generateSequence(0, 0).name("testSource2")
- .keyBy(x=>x)
- .reduce((x, y) => 0)
- .name("testReduce")
- assert("testReduce" == dataStream2.getName)
-
- val connected = dataStream1.connect(dataStream2)
- .flatMap({ (in, out: Collector[(Long, Long)]) => }, { (in, out: Collector[(Long, Long)]) => })
- .name("testCoFlatMap")
-
- assert("testCoFlatMap" == connected.getName)
-
- val func: (((Long, Long), (Long, Long)) => (Long, Long)) =
- (x: (Long, Long), y: (Long, Long)) => (0L, 0L)
-
- val windowed = connected
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
- .fold((0L, 0L), func)
-
- windowed.name("testWindowFold")
-
- assert("testWindowFold" == windowed.getName)
-
- windowed.print()
-
- val plan = env.getExecutionPlan
-
- assert(plan contains "testSource1")
- assert(plan contains "testSource2")
- assert(plan contains "testMap")
- assert(plan contains "testReduce")
- assert(plan contains "testCoFlatMap")
- assert(plan contains "testWindowFold")
- }
-
- /**
- * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in
- * different and correct topologies. Does the some for the {@link ConnectedStreams}.
- */
- @Test
- def testPartitioning(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val src1: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
- val src2: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
-
- val connected = src1.connect(src2)
-
- val group1 = src1.keyBy(0)
- val group2 = src1.keyBy(1, 0)
- val group3 = src1.keyBy("_1")
- val group4 = src1.keyBy(x => x._1)
-
- val gid1 = createDownStreamId(group1)
- val gid2 = createDownStreamId(group2)
- val gid3 = createDownStreamId(group3)
- val gid4 = createDownStreamId(group4)
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid1)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid2)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid3)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid4)))
-
- //Testing DataStream partitioning
- val partition1: DataStream[_] = src1.partitionByHash(0)
- val partition2: DataStream[_] = src1.partitionByHash(1, 0)
- val partition3: DataStream[_] = src1.partitionByHash("_1")
- val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1)
-
- val pid1 = createDownStreamId(partition1)
- val pid2 = createDownStreamId(partition2)
- val pid3 = createDownStreamId(partition3)
- val pid4 = createDownStreamId(partition4)
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid1)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid2)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid3)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid4)))
-
- // Testing DataStream custom partitioning
- val longPartitioner: Partitioner[Long] = new Partitioner[Long] {
- override def partition(key: Long, numPartitions: Int): Int = 0
- }
-
- val customPartition1: DataStream[_] =
- src1.partitionCustom(longPartitioner, 0)
- val customPartition3: DataStream[_] =
- src1.partitionCustom(longPartitioner, "_1")
- val customPartition4: DataStream[_] =
- src1.partitionCustom(longPartitioner, (x : (Long, Long)) => x._1)
-
- val cpid1 = createDownStreamId(customPartition1)
- val cpid2 = createDownStreamId(customPartition3)
- val cpid3 = createDownStreamId(customPartition4)
- assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid1)))
- assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid2)))
- assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
-
- //Testing ConnectedStreams grouping
- val connectedGroup1: ConnectedStreams[_, _] = connected.keyBy(0, 0)
- val downStreamId1: Integer = createDownStreamId(connectedGroup1)
-
- val connectedGroup2: ConnectedStreams[_, _] = connected.keyBy(Array[Int](0), Array[Int](0))
- val downStreamId2: Integer = createDownStreamId(connectedGroup2)
-
- val connectedGroup3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1")
- val downStreamId3: Integer = createDownStreamId(connectedGroup3)
-
- val connectedGroup4: ConnectedStreams[_, _] =
- connected.keyBy(Array[String]("_1"), Array[String]("_1"))
- val downStreamId4: Integer = createDownStreamId(connectedGroup4)
-
- val connectedGroup5: ConnectedStreams[_, _] = connected.keyBy(x => x._1, x => x._1)
- val downStreamId5: Integer = createDownStreamId(connectedGroup5)
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId1)))
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId2)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId2)))
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId3)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId3)))
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId4)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId4)))
-
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId5)))
- assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId5)))
-
- //Testing ConnectedStreams partitioning
- val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0)
- val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
-
- val connectedPartition2: ConnectedStreams[_, _] =
- connected.partitionByHash(Array[Int](0), Array[Int](0))
- val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
-
- val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1")
- val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
-
- val connectedPartition4: ConnectedStreams[_, _] =
- connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
- val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
-
- val connectedPartition5: ConnectedStreams[_, _] =
- connected.partitionByHash(x => x._1, x => x._1)
- val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
-
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId1))
- )
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId1))
- )
-
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId2))
- )
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId2))
- )
-
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId3))
- )
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId3))
- )
-
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId4))
- )
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId4))
- )
-
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId5))
- )
- assert(
- isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId5))
- )
- }
-
- /**
- * Tests whether parallelism gets set.
- */
- @Test
- def testParallelism() {
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
-
- val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
- val map = src.map(x => (0L, 0L))
- val windowed: DataStream[(Long, Long)] = map
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
- .fold((0L, 0L), (x: (Long, Long), y: (Long, Long)) => (0L, 0L))
-
- windowed.print()
- val sink = map.addSink(x => {})
-
- assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
- assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
-
- try {
- src.setParallelism(3)
- fail()
- }
- catch {
- case success: IllegalArgumentException => {
- }
- }
-
- env.setParallelism(7)
- // the parallelism does not change since some windowing code takes the parallelism from
- // input operations and that cannot change dynamically
- assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
- assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
- assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
-
- val parallelSource = env.generateSequence(0, 0)
- parallelSource.print()
-
- assert(7 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
-
- parallelSource.setParallelism(3)
- assert(3 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
-
- map.setParallelism(2)
- assert(2 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
-
- sink.setParallelism(4)
- assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
- }
-
- @Test
- def testTypeInfo() {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val src1: DataStream[Long] = env.generateSequence(0, 0)
- assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
-
- val map: DataStream[(Integer, String)] = src1.map(x => null)
- assert(classOf[scala.Tuple2[Integer, String]] == map.getType().getTypeClass)
-
- val window: DataStream[String] = map
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
- .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
-
- assert(TypeExtractor.getForClass(classOf[String]) == window.getType)
-
- val flatten: DataStream[Int] = window
- .windowAll(GlobalWindows.create())
- .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
- .fold(0, (accumulator: Int, value: String) => 0)
- assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType())
-
- // TODO check for custom case class
- }
-
- @Test def operatorTest() {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val src = env.generateSequence(0, 0)
-
- val mapFunction = new MapFunction[Long, Int] {
- override def map(value: Long): Int = 0
- }
-
- val map = src.map(mapFunction)
- assert(mapFunction == getFunctionForDataStream(map))
- assert(getFunctionForDataStream(map.map(x => 0)).isInstanceOf[MapFunction[_, _]])
-
- val statefulMap2 = src.keyBy(x => x).mapWithState(
- (in, state: Option[Long]) => (in, None.asInstanceOf[Option[Long]]))
-
- val flatMapFunction = new FlatMapFunction[Long, Int] {
- override def flatMap(value: Long, out: Collector[Int]): Unit = {}
- }
-
- val flatMap = src.flatMap(flatMapFunction)
- assert(flatMapFunction == getFunctionForDataStream(flatMap))
- assert(
- getFunctionForDataStream(flatMap
- .flatMap((x: Int, out: Collector[Int]) => {}))
- .isInstanceOf[FlatMapFunction[_, _]])
-
- val statefulfMap2 = src.keyBy(x => x).flatMapWithState(
- (in, state: Option[Long]) => (List(in), None.asInstanceOf[Option[Long]]))
-
- val filterFunction = new FilterFunction[Int] {
- override def filter(value: Int): Boolean = false
- }
-
- val unionFilter = map.union(flatMap).filter(filterFunction)
- assert(filterFunction == getFunctionForDataStream(unionFilter))
- assert(
- getFunctionForDataStream(map
- .filter((x: Int) => true))
- .isInstanceOf[FilterFunction[_]])
-
- val statefulFilter2 = src.keyBy( x => x).filterWithState[Long](
- (in, state: Option[Long]) => (false, None))
-
- try {
- env.getStreamGraph.getStreamEdge(map.getId, unionFilter.getId)
- }
- catch {
- case e: Throwable => {
- fail(e.getMessage)
- }
- }
-
- try {
- env.getStreamGraph.getStreamEdge(flatMap.getId, unionFilter.getId)
- }
- catch {
- case e: Throwable => {
- fail(e.getMessage)
- }
- }
-
- val outputSelector = new OutputSelector[Int] {
- override def select(value: Int): lang.Iterable[String] = null
- }
-
- val split = unionFilter.split(outputSelector)
- split.print()
- val outputSelectors = env.getStreamGraph.getStreamNode(unionFilter.getId).getOutputSelectors
- assert(1 == outputSelectors.size)
- assert(outputSelector == outputSelectors.get(0))
-
- unionFilter.split(x => List("a")).print()
- val moreOutputSelectors = env.getStreamGraph.getStreamNode(unionFilter.getId).getOutputSelectors
- assert(2 == moreOutputSelectors.size)
-
- val select = split.select("a")
- val sink = select.print()
- val splitEdge =
- env.getStreamGraph.getStreamEdge(unionFilter.getId, sink.getTransformation.getId)
- assert("a" == splitEdge.getSelectedNames.get(0))
-
- val foldFunction = new FoldFunction[Int, String] {
- override def fold(accumulator: String, value: Int): String = ""
- }
- val fold = map.keyBy(x=>x).fold("", foldFunction)
- assert(foldFunction == getFunctionForDataStream(fold))
- assert(
- getFunctionForDataStream(map.keyBy(x=>x)
- .fold("", (x: String, y: Int) => ""))
- .isInstanceOf[FoldFunction[_, _]])
-
- val connect = fold.connect(flatMap)
-
- val coMapFunction =
- new CoMapFunction[String, Int, String] {
- override def map1(value: String): String = ""
-
- override def map2(value: Int): String = ""
- }
- val coMap = connect.map(coMapFunction)
- assert(coMapFunction == getFunctionForDataStream(coMap))
-
- try {
- env.getStreamGraph.getStreamEdge(fold.getId, coMap.getId)
- }
- catch {
- case e: Throwable => {
- fail(e.getMessage)
- }
- }
- try {
- env.getStreamGraph.getStreamEdge(flatMap.getId, coMap.getId)
- }
- catch {
- case e: Throwable => {
- fail(e.getMessage)
- }
- }
- }
-
- @Test
- def testChannelSelectors() {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val src = env.generateSequence(0, 0)
-
- val broadcast = src.broadcast
- val broadcastSink = broadcast.print()
- val broadcastPartitioner = env.getStreamGraph
- .getStreamEdge(src.getId, broadcastSink.getTransformation.getId).getPartitioner
- assert(broadcastPartitioner.isInstanceOf[BroadcastPartitioner[_]])
-
- val shuffle: DataStream[Long] = src.shuffle
- val shuffleSink = shuffle.print()
- val shufflePartitioner = env.getStreamGraph
- .getStreamEdge(src.getId, shuffleSink.getTransformation.getId).getPartitioner
- assert(shufflePartitioner.isInstanceOf[ShufflePartitioner[_]])
-
- val forward: DataStream[Long] = src.forward
- val forwardSink = forward.print()
- val forwardPartitioner = env.getStreamGraph
- .getStreamEdge(src.getId, forwardSink.getTransformation.getId).getPartitioner
- assert(forwardPartitioner.isInstanceOf[ForwardPartitioner[_]])
-
- val rebalance: DataStream[Long] = src.rebalance
- val rebalanceSink = rebalance.print()
- val rebalancePartitioner = env.getStreamGraph
- .getStreamEdge(src.getId, rebalanceSink.getTransformation.getId).getPartitioner
- assert(rebalancePartitioner.isInstanceOf[RebalancePartitioner[_]])
-
- val global: DataStream[Long] = src.global
- val globalSink = global.print()
- val globalPartitioner = env.getStreamGraph
- .getStreamEdge(src.getId, globalSink.getTransformation.getId).getPartitioner
- assert(globalPartitioner.isInstanceOf[GlobalPartitioner[_]])
- }
-
- @Test
- def testIterations() {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // we need to rebalance before iteration
- val source = env.fromElements(1, 2, 3).map { t: Int => t }
-
- val iterated = source.iterate((input: ConnectedStreams[Int, String]) => {
- val head = input.map(i => (i + 1).toString, s => s)
- (head.filter(_ == "2"), head.filter(_ != "2"))
- }, 1000).print()
-
- val iterated2 = source.iterate((input: DataStream[Int]) =>
- (input.map(_ + 1), input.map(_.toString)), 2000)
-
- try {
- val invalid = source.iterate((input: ConnectedStreams[Int, String]) => {
- val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
- (head.filter(_ == "2"), head.filter(_ != "2"))
- }, 1000).print()
- fail()
- } catch {
- case uoe: UnsupportedOperationException =>
- case e: Exception => fail()
- }
-
- val sg = env.getStreamGraph
-
- assert(sg.getIterationSourceSinkPairs().size() == 2)
- }
-
- /////////////////////////////////////////////////////////////
- // Utilities
- /////////////////////////////////////////////////////////////
-
- private def getFunctionForDataStream(dataStream: DataStream[_]): Function = {
- dataStream.print()
- val operator = getOperatorForDataStream(dataStream)
- .asInstanceOf[AbstractUdfStreamOperator[_, _]]
- operator.getUserFunction.asInstanceOf[Function]
- }
-
- private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = {
- dataStream.print()
- val env = dataStream.getJavaStream.getExecutionEnvironment
- val streamGraph: StreamGraph = env.getStreamGraph
- streamGraph.getStreamNode(dataStream.getId).getOperator
- }
-
- private def isPartitioned(edge: StreamEdge): Boolean = {
- edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
- }
-
- private def isCustomPartitioned(edge: StreamEdge): Boolean = {
- edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
- }
-
- private def createDownStreamId(dataStream: DataStream[_]): Integer = {
- dataStream.print().getTransformation.getId
- }
-
- private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {
- val m = dataStream.map(x => 0, x => 0)
- m.print()
- m.getId
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
deleted file mode 100644
index e09f164..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
-import org.apache.flink.test.util.MultipleProgramsTestBase
-
-import scala.language.existentials
-
-/**
- * Test programs for built in output formats. Invoked from {@link OutputFormatTest}.
- */
-object OutputFormatTestPrograms {
-
- def wordCountToText(input : String, outputPath : String) : Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- //Create streams for names and ages by mapping the inputs to the corresponding objects
- val text = env.fromElements(input)
- val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
- .map { (_, 1) }
- .keyBy(0)
- .sum(1)
-
- counts.writeAsText(outputPath)
-
- env.execute("Scala WordCountToText")
- }
-
- def wordCountToCsv(input : String, outputPath : String) : Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- //Create streams for names and ages by mapping the inputs to the corresponding objects
- val text = env.fromElements(input)
- val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
- .map { (_, 1) }
- .keyBy(0)
- .sum(1)
-
- counts.writeAsCsv(outputPath)
-
- env.execute("Scala WordCountToCsv")
- }
-
- def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- //Create streams for names and ages by mapping the inputs to the corresponding objects
- val text = env.fromElements(input)
- val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
- .map { (_, 1) }
- .keyBy(0)
- .sum(1)
- .map(tuple => tuple.toString() + "\n")
-
- counts.writeToSocket(outputHost, outputPort, new DummyStringSchema())
-
- env.execute("Scala WordCountToCsv")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
deleted file mode 100644
index 3342e1e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.junit.JUnitSuiteLike
-
-trait ScalaStreamingMultipleProgramsTestBase
- extends TestBaseUtils
- with JUnitSuiteLike
- with BeforeAndAfterAll {
-
- val parallelism = 4
- var cluster: Option[ForkableFlinkMiniCluster] = None
-
- override protected def beforeAll(): Unit = {
- val cluster = Some(
- TestBaseUtils.startCluster(
- 1,
- parallelism,
- StreamingMode.STREAMING,
- false,
- false,
- true
- )
- )
-
- val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
- }
-
- override protected def afterAll(): Unit = {
- cluster.foreach {
- TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
deleted file mode 100644
index b2e05b3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.util
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-
-/**
- * Test programs for stateful functions.
- */
-object StateTestPrograms {
-
- def testStatefulFunctions(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- // test stateful map
- env.generateSequence(0, 10).setParallelism(1)
- .map { v => (1, v) }.setParallelism(1)
- .keyBy(_._1)
- .mapWithState((in, count: Option[Long]) =>
- count match {
- case Some(c) => (in._2 - c, Some(c + 1))
- case None => (in._2, Some(1L))
- }).setParallelism(1)
-
- .addSink(new RichSinkFunction[Long]() {
- var allZero = true
- override def invoke(in: Long) = {
- if (in != 0) allZero = false
- }
- override def close() = {
- assert(allZero)
- }
- })
-
- // test stateful flatmap
- env.fromElements((1, "First"), (2, "Second"), (1, "Hello world"))
- .keyBy(_._1)
- .flatMapWithState((w, s: Option[String]) =>
- s match {
- case Some(state) => (w._2.split(" ").toList.map(state + _), Some(w._2))
- case None => (List(w._2), Some(w._2))
- })
- .setParallelism(1)
-
- .addSink(new RichSinkFunction[String]() {
- val received = new util.HashSet[String]()
- override def invoke(in: String) = { received.add(in) }
- override def close() = {
- assert(received.size() == 4)
- assert(received.contains("First"))
- assert(received.contains("Second"))
- assert(received.contains("FirstHello"))
- assert(received.contains("Firstworld"))
- }
- }).setParallelism(1)
-
- // test stateful filter
- env.generateSequence(1, 10).keyBy(_ % 2).filterWithState((in, state: Option[Int]) =>
- state match {
- case Some(s) => (s < 2, Some(s + 1))
- case None => (true, Some(1))
- }).addSink(new RichSinkFunction[Long]() {
- var numOdd = 0
- var numEven = 0
- override def invoke(in: Long) = {
- if (in % 2 == 0) { numEven += 1 } else { numOdd += 1 }
- }
- override def close() = {
- assert(numOdd == 2)
- assert(numEven == 2)
- }
- }).setParallelism(1)
-
- env.execute("Stateful test")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
deleted file mode 100644
index 2131026..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.rules.TemporaryFolder
-import org.junit.{After, Before, Rule, Test}
-
-class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
-
- var resultPath1: String = _
- var resultPath2: String = _
- var expected1: String = _
- var expected2: String = _
-
- val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder: TemporaryFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- val temp = tempFolder
- resultPath1 = temp.newFile.toURI.toString
- resultPath2 = temp.newFile.toURI.toString
- expected1 = ""
- expected2 = ""
- }
-
- @After
- def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1)
- TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2)
- }
-
- /** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created.
- * The stream is grouped by the first field. For each group, the resulting stream is folded by
- * summing up the second tuple field.
- *
- */
- @Test
- def testFoldOperator(): Unit = {
- val numElements = 10
- val numKeys = 2
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- env.setParallelism(2)
-
- val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
-
- override def run(ctx: SourceContext[(Int, Int)]): Unit = {
- 0 until numElements foreach {
- i => ctx.collect((i % numKeys, i))
- }
- }
-
- override def cancel(): Unit = {}
- })
-
- val splittedResult = sourceStream
- .keyBy(0)
- .fold(0, new FoldFunction[(Int, Int), Int] {
- override def fold(accumulator: Int, value: (Int, Int)): Int = {
- accumulator + value._2
- }
- })
- .map(new RichMapFunction[Int, (Int, Int)] {
- override def map(value: Int): (Int, Int) = {
- (getRuntimeContext.getIndexOfThisSubtask, value)
- }
- })
- .split{
- x =>
- Seq(x._1.toString)
- }
-
- splittedResult
- .select("0")
- .map(_._2)
- .getJavaStream
- .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
- splittedResult
- .select("1")
- .map(_._2)
- .getJavaStream
- .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
-
- val groupedSequence = 0 until numElements groupBy( _ % numKeys)
-
- expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
- expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
-
- env.execute()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
deleted file mode 100644
index 101f3b5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-
-import scala.language.existentials
-
-import org.junit.Test
-
-/**
- * This checks whether the streaming Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for streaming.
- */
-class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
-
- override def isExcludedByName(method: Method): Boolean = {
- val name = method.getDeclaringClass.getName + "." + method.getName
- val excludedNames = Seq(
- // These are only used internally. Should be internal API but Java doesn't have
- // private[flink].
- "org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.DataStream.getType",
- "org.apache.flink.streaming.api.datastream.DataStream.copy",
- "org.apache.flink.streaming.api.datastream.DataStream.transform",
- "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
- "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getSecondInput",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
- "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
-
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
- "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
-
- "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
- "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
-
- "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
- "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
-
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." +
- "getStateHandleProvider",
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointInterval",
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addOperator",
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointingMode",
- "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." +
- "isForceCheckpointing",
-
-
- // TypeHints are only needed for Java API, Scala API doesn't need them
- "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns",
-
- // Deactivated until Scala API has new windowing API
- "org.apache.flink.streaming.api.datastream.DataStream.timeWindowAll",
- "org.apache.flink.streaming.api.datastream.DataStream.windowAll"
- )
- val excludedPatterns = Seq(
- // We don't have project on tuples in the Scala API
- """^org\.apache\.flink\.streaming.api.*project""",
-
- // Cleaning is easier in the Scala API
- """^org\.apache\.flink\.streaming.api.*clean""",
-
- // Object methods
- """^.*notify""",
- """^.*wait""",
- """^.*notifyAll""",
- """^.*equals""",
- """^.*toString""",
- """^.*getClass""",
- """^.*hashCode"""
- ).map(_.r)
- lazy val excludedByPattern =
- excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
- name.contains("$") || excludedNames.contains(name) || excludedByPattern
- }
-
- @Test
- override def testCompleteness(): Unit = {
- checkMethods("DataStream", "DataStream", classOf[JavaStream[_]], classOf[DataStream[_]])
-
- checkMethods(
- "StreamExecutionEnvironment", "StreamExecutionEnvironment",
- classOf[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment],
- classOf[StreamExecutionEnvironment])
-
- checkMethods(
- "SingleOutputStreamOperator", "DataStream",
- classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_,_]],
- classOf[DataStream[_]])
-
- checkMethods(
- "ConnectedStreams", "ConnectedStreams",
- classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
- classOf[ConnectedStreams[_,_]])
-
- checkMethods(
- "SplitStream", "SplitStream",
- classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
- classOf[SplitStream[_]])
-
- checkMethods(
- "WindowedStream", "WindowedStream",
- classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]],
- classOf[WindowedStream[_, _, _]])
-
- checkMethods(
- "AllWindowedStream", "AllWindowedStream",
- classOf[org.apache.flink.streaming.api.datastream.AllWindowedStream[_, _]],
- classOf[AllWindowedStream[_, _]])
-
- checkMethods(
- "KeyedStream", "KeyedStream",
- classOf[org.apache.flink.streaming.api.datastream.KeyedStream[_, _]],
- classOf[KeyedStream[_, _]])
-
- checkMethods(
- "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
- classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
- classOf[JoinedStreams.WithWindow[_,_,_,_]])
-
- checkMethods(
- "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
- classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
- classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
deleted file mode 100644
index d4e8bb2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.mutable
-
-/**
- * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
- * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
- */
-class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testFoldWindow(): Unit = {
- WindowFoldITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val source1 = env.addSource(new SourceFunction[(String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
- ctx.collect(("a", 0))
- ctx.collect(("a", 1))
- ctx.collect(("a", 2))
- ctx.collect(("b", 3))
- ctx.collect(("b", 4))
- ctx.collect(("b", 5))
- ctx.collect(("a", 6))
- ctx.collect(("a", 7))
- ctx.collect(("a", 8))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
-
- source1
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
- .addSink(new SinkFunction[(String, Int)]() {
- def invoke(value: (String, Int)) {
- WindowFoldITCase.testResults += value.toString
- }
- })
-
- env.execute("Fold Window Test")
-
- val expectedResult = mutable.MutableList(
- "(R:aaa,3)",
- "(R:aaa,21)",
- "(R:bbb,12)")
-
- assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
- }
-
- @Test
- def testFoldAllWindow(): Unit = {
- WindowFoldITCase.testResults = mutable.MutableList()
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setParallelism(1)
-
- val source1 = env.addSource(new SourceFunction[(String, Int)]() {
- def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
- ctx.collect(("a", 0))
- ctx.collect(("a", 1))
- ctx.collect(("a", 2))
- ctx.collect(("b", 3))
- ctx.collect(("a", 3))
- ctx.collect(("b", 4))
- ctx.collect(("a", 4))
- ctx.collect(("b", 5))
- ctx.collect(("a", 5))
- }
-
- def cancel() {
- }
- }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
-
- source1
- .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
- .addSink(new SinkFunction[(String, Int)]() {
- def invoke(value: (String, Int)) {
- WindowFoldITCase.testResults += value.toString
- }
- })
-
- env.execute("Fold All-Window Test")
-
- val expectedResult = mutable.MutableList(
- "(R:aaa,3)",
- "(R:bababa,24)")
-
- assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
- }
-
-}
-
-
-object WindowFoldITCase {
- private var testResults: mutable.MutableList[String] = null
-
- private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
- def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
- element._2
- }
-
- def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
- element._2 - 1
- }
-
- def getCurrentWatermark: Long = {
- Long.MinValue
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
deleted file mode 100644
index 46981ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
-import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert._
-import org.junit.Test
-
-class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- */
- @Test
- def testFastTimeWindows(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
-
- val window2 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- key: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
- }
-
- @Test
- def testNonEvicting(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(
- winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
- }
-
- @Test
- def testEvicting(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
- .reduce(reducer)
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
- assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(1000))
- .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
- }
-
- @Test
- def testPreReduce(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val source = env.fromElements(("hello", 1), ("hello", 2))
-
- val reducer = new DummyReducer
-
- val window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(
- Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform1 = window1.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator1 = transform1.getOperator
-
- assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
- assertTrue(
- winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
- val window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
- def apply(
- tuple: Tuple,
- window: TimeWindow,
- values: java.lang.Iterable[(String, Int)],
- out: Collector[(String, Int)]) { }
- })
-
- val transform2 = window2.getJavaStream.getTransformation
- .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
- val operator2 = transform2.getOperator
-
- assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
- val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
- assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
- assertTrue(
- winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/pom.xml b/flink-staging/flink-streaming/pom.xml
deleted file mode 100644
index aa233c8..0000000
--- a/flink-staging/flink-streaming/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-staging</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-streaming-parent</artifactId>
- <name>flink-streaming</name>
- <packaging>pom</packaging>
-
- <modules>
- <module>flink-streaming-core</module>
- <module>flink-streaming-scala</module>
- <module>flink-streaming-examples</module>
- <module>flink-streaming-connectors</module>
- </modules>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 7bc76a7..8483bfa 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -38,7 +38,6 @@ under the License.
<module>flink-avro</module>
<module>flink-jdbc</module>
<module>flink-hadoop-compatibility</module>
- <module>flink-streaming</module>
<module>flink-hbase</module>
<module>flink-hcatalog</module>
<module>flink-table</module>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
new file mode 100644
index 0000000..f3efe2e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-elasticsearch</artifactId>
+ <name>flink-connector-elasticsearch</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <elasticsearch.version>1.7.1</elasticsearch.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <rerunFailingTestsCount>3</rerunFailingTestsCount>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <rerunFailingTestsCount>3</rerunFailingTestsCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
new file mode 100644
index 0000000..546ec8d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+
+/**
+ * Sink that emits its input elements to an Elasticsearch cluster.
+ *
+ * <p>
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
+ * the sink will create a local {@link Node} for communicating with the
+ * Elasticsearch cluster. When using the second constructor
+ * {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} a {@link TransportClient} will
+ * be used instead.
+ *
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
+ * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
+ * to come online.
+ *
+ * <p>
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
+ * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be set to the name
+ * of the cluster that the sink should emit to.
+ *
+ * <p>
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ * <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ * <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>
+ * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
+ * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
+ * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
+
+ /**
+ * The user specified config map that we forward to Elasticsearch when we create the Client.
+ */
+ private final Map<String, String> userConfig;
+
+ /**
+ * The list of nodes that the TransportClient should connect to. This is null if we are using
+ * an embedded Node to get a Client.
+ */
+ private final List<TransportAddress> transportNodes;
+
+ /**
+ * The builder that is used to construct an {@link IndexRequest} from the incoming element.
+ */
+ private final IndexRequestBuilder<T> indexRequestBuilder;
+
+ /**
+ * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
+ * if we are using a TransportClient.
+ */
+ private transient Node node;
+
+ /**
+ * The Client that was either retrieved from a Node or is a TransportClient.
+ */
+ private transient Client client;
+
+ /**
+ * Bulk processor that was created using the client
+ */
+ private transient BulkProcessor bulkProcessor;
+
+ /**
+ * This is set from inside the BulkProcessor listener if there where failures in processing.
+ */
+ private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+ /**
+ * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+ /**
+ * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
+ *
+ * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
+ * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+ */
+ public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
+ this.userConfig = userConfig;
+ this.indexRequestBuilder = indexRequestBuilder;
+ transportNodes = null;
+ }
+
+ /**
+ * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+ *
+ * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
+ * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
+ * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+ *
+ */
+ public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
+ this.userConfig = userConfig;
+ this.indexRequestBuilder = indexRequestBuilder;
+ this.transportNodes = transportNodes;
+ }
+
+ /**
+ * Initializes the connection to Elasticsearch by either creating an embedded
+ * {@link org.elasticsearch.node.Node} and retrieving the
+ * {@link org.elasticsearch.client.Client} from it or by creating a
+ * {@link org.elasticsearch.client.transport.TransportClient}.
+ */
+ @Override
+ public void open(Configuration configuration) {
+ if (transportNodes == null) {
+ // Make sure that we disable http access to our embedded node
+ Settings settings =
+ ImmutableSettings.settingsBuilder()
+ .put(userConfig)
+ .put("http.enabled", false)
+ .build();
+
+ node =
+ nodeBuilder()
+ .settings(settings)
+ .client(true)
+ .data(false)
+ .node();
+
+ client = node.client();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch Client {} from embedded Node", client);
+ }
+
+ } else {
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put(userConfig)
+ .build();
+
+ TransportClient transportClient = new TransportClient(settings);
+ for (TransportAddress transport: transportNodes) {
+ transportClient.addTransportAddress(transport);
+ }
+
+ // verify that we actually are connected to a cluster
+ ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
+ if (nodes.isEmpty()) {
+ throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to nodes: " + nodes.toString());
+ }
+ }
+
+ client = transportClient;
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Created Elasticsearch TransportClient {}", client);
+ }
+ }
+
+ BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+ client,
+ new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId,
+ BulkRequest request) {
+
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ BulkResponse response) {
+ if (response.hasFailures()) {
+ for (BulkItemResponse itemResp : response.getItems()) {
+ if (itemResp.isFailed()) {
+ LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+ failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
+ }
+ }
+ hasFailure.set(true);
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ Throwable failure) {
+ LOG.error(failure.getMessage());
+ failureThrowable.compareAndSet(null, failure);
+ hasFailure.set(true);
+ }
+ });
+
+ // This makes flush() blocking
+ bulkProcessorBuilder.setConcurrentRequests(0);
+
+ ParameterTool params = ParameterTool.fromMap(userConfig);
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+ bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+ bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
+ CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+ bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+ }
+
+ bulkProcessor = bulkProcessorBuilder.build();
+ }
+
+ @Override
+ public void invoke(T element) {
+ IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emitting IndexRequest: {}", indexRequest);
+ }
+
+ bulkProcessor.add(indexRequest);
+ }
+
+ @Override
+ public void close() {
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ bulkProcessor = null;
+ }
+
+ if (client != null) {
+ client.close();
+ }
+
+ if (node != null) {
+ node.close();
+ }
+
+ if (hasFailure.get()) {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+ } else {
+ throw new RuntimeException("An error occured in ElasticsearchSink.");
+
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
new file mode 100644
index 0000000..04ae40a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.index.IndexRequest;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates an {@link IndexRequest} from an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
+ * to prepare elements for sending them to Elasticsearch. See
+ * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a>
+ * for information about how to format data for adding it to an Elasticsearch index.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> {
+ *
+ * public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+ * Map<String, Object> json = new HashMap<>();
+ * json.put("data", element);
+ *
+ * return Requests.indexRequest()
+ * .index("my-index")
+ * .type("my-type")
+ * .source(json);
+ * }
+ * }
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
+ */
+public interface IndexRequestBuilder<T> extends Function, Serializable {
+
+ /**
+ * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element.
+ *
+ * @param element The element that needs to be turned in to an {@code IndexRequest}
+ * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink}
+ *
+ * @return The constructed {@code IndexRequest}
+ */
+ IndexRequest createIndexRequest(T element, RuntimeContext ctx);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..298eb64
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster names "elasticsearch" running or change the cluster name in the config map.
+ */
+public class ElasticsearchExample {
+
+ public static void main(String[] args) throws Exception {
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ for (int i = 0; i < 20 && running; i++) {
+ ctx.collect("message #" + i);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+
+ Map<String, String> config = Maps.newHashMap();
+ // This instructs the sink to emit after every element, otherwise they would be buffered
+ config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+ source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
+ @Override
+ public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+ Map<String, Object> json = new HashMap<>();
+ json.put("data", element);
+
+ return Requests.indexRequest()
+ .index("my-index")
+ .type("my-type")
+ .source(json);
+ }
+ }));
+
+
+ env.execute("Elasticsearch Example");
+ }
+}