You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:29 UTC

[13/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
deleted file mode 100644
index 988e7ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.lang
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.co.CoMapFunction
-import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.runtime.partitioner._
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert.fail
-import org.junit.Test
-
-class DataStreamTest extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testNaming(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source1Operator = env.generateSequence(0, 0).name("testSource1")
-    val source1 = source1Operator
-    assert("testSource1" == source1Operator.getName)
-
-    val dataStream1 = source1
-      .map(x => 0L)
-      .name("testMap")
-    assert("testMap" == dataStream1.getName)
-
-    val dataStream2 = env.generateSequence(0, 0).name("testSource2")
-      .keyBy(x=>x)
-      .reduce((x, y) => 0)
-      .name("testReduce")
-    assert("testReduce" == dataStream2.getName)
-
-    val connected = dataStream1.connect(dataStream2)
-      .flatMap({ (in, out: Collector[(Long, Long)]) => }, { (in, out: Collector[(Long, Long)]) => })
-      .name("testCoFlatMap")
-
-    assert("testCoFlatMap" == connected.getName)
-
-    val func: (((Long, Long), (Long, Long)) => (Long, Long)) =
-      (x: (Long, Long), y: (Long, Long)) => (0L, 0L)
-
-    val windowed = connected
-      .windowAll(GlobalWindows.create())
-      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
-      .fold((0L, 0L), func)
-
-    windowed.name("testWindowFold")
-
-    assert("testWindowFold" == windowed.getName)
-
-    windowed.print()
-
-    val plan = env.getExecutionPlan
-
-    assert(plan contains "testSource1")
-    assert(plan contains "testSource2")
-    assert(plan contains "testMap")
-    assert(plan contains "testReduce")
-    assert(plan contains "testCoFlatMap")
-    assert(plan contains "testWindowFold")
-  }
-
-  /**
-   * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in
-   * different and correct topologies. Does the some for the {@link ConnectedStreams}.
-   */
-  @Test
-  def testPartitioning(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val src1: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
-    val src2: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
-
-    val connected = src1.connect(src2)
-
-    val group1 = src1.keyBy(0)
-    val group2 = src1.keyBy(1, 0)
-    val group3 = src1.keyBy("_1")
-    val group4 = src1.keyBy(x => x._1)
-
-    val gid1 = createDownStreamId(group1)
-    val gid2 = createDownStreamId(group2)
-    val gid3 = createDownStreamId(group3)
-    val gid4 = createDownStreamId(group4)
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid1)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid2)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid3)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, gid4)))
-
-    //Testing DataStream partitioning
-    val partition1: DataStream[_] = src1.partitionByHash(0)
-    val partition2: DataStream[_] = src1.partitionByHash(1, 0)
-    val partition3: DataStream[_] = src1.partitionByHash("_1")
-    val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1)
-
-    val pid1 = createDownStreamId(partition1)
-    val pid2 = createDownStreamId(partition2)
-    val pid3 = createDownStreamId(partition3)
-    val pid4 = createDownStreamId(partition4)
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid1)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid2)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid3)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, pid4)))
-
-    // Testing DataStream custom partitioning
-    val longPartitioner: Partitioner[Long] = new Partitioner[Long] {
-      override def partition(key: Long, numPartitions: Int): Int = 0
-    }
-
-    val customPartition1: DataStream[_] =
-      src1.partitionCustom(longPartitioner, 0)
-    val customPartition3: DataStream[_] =
-      src1.partitionCustom(longPartitioner, "_1")
-    val customPartition4: DataStream[_] =
-      src1.partitionCustom(longPartitioner, (x : (Long, Long)) => x._1)
-
-    val cpid1 = createDownStreamId(customPartition1)
-    val cpid2 = createDownStreamId(customPartition3)
-    val cpid3 = createDownStreamId(customPartition4)
-    assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid1)))
-    assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid2)))
-    assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
-
-    //Testing ConnectedStreams grouping
-    val connectedGroup1: ConnectedStreams[_, _] = connected.keyBy(0, 0)
-    val downStreamId1: Integer = createDownStreamId(connectedGroup1)
-
-    val connectedGroup2: ConnectedStreams[_, _] = connected.keyBy(Array[Int](0), Array[Int](0))
-    val downStreamId2: Integer = createDownStreamId(connectedGroup2)
-
-    val connectedGroup3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1")
-    val downStreamId3: Integer = createDownStreamId(connectedGroup3)
-
-    val connectedGroup4: ConnectedStreams[_, _] =
-      connected.keyBy(Array[String]("_1"), Array[String]("_1"))
-    val downStreamId4: Integer = createDownStreamId(connectedGroup4)
-
-    val connectedGroup5: ConnectedStreams[_, _] = connected.keyBy(x => x._1, x => x._1)
-    val downStreamId5: Integer = createDownStreamId(connectedGroup5)
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId1)))
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId2)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId2)))
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId3)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId3)))
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId4)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId4)))
-
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId5)))
-    assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId5)))
-
-    //Testing ConnectedStreams partitioning
-    val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0)
-    val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
-
-    val connectedPartition2: ConnectedStreams[_, _] =
-      connected.partitionByHash(Array[Int](0), Array[Int](0))
-    val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
-
-    val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1")
-    val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
-
-    val connectedPartition4: ConnectedStreams[_, _] =
-      connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
-    val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
-
-    val connectedPartition5: ConnectedStreams[_, _] =
-      connected.partitionByHash(x => x._1, x => x._1)
-    val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
-
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId1))
-    )
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId1))
-    )
-
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId2))
-    )
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId2))
-    )
-
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId3))
-    )
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId3))
-    )
-
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId4))
-    )
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId4))
-    )
-
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, connectDownStreamId5))
-    )
-    assert(
-      isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, connectDownStreamId5))
-    )
-  }
-
-  /**
-   * Tests whether parallelism gets set.
-   */
-  @Test
-  def testParallelism() {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
-
-    val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
-    val map = src.map(x => (0L, 0L))
-    val windowed: DataStream[(Long, Long)] = map
-      .windowAll(GlobalWindows.create())
-      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
-      .fold((0L, 0L), (x: (Long, Long), y: (Long, Long)) => (0L, 0L))
-
-    windowed.print()
-    val sink = map.addSink(x => {})
-
-    assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
-    assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
-
-    try {
-      src.setParallelism(3)
-      fail()
-    }
-    catch {
-      case success: IllegalArgumentException => {
-      }
-    }
-
-    env.setParallelism(7)
-    // the parallelism does not change since some windowing code takes the parallelism from
-    // input operations and that cannot change dynamically
-    assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
-    assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
-
-    val parallelSource = env.generateSequence(0, 0)
-    parallelSource.print()
-
-    assert(7 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
-
-    parallelSource.setParallelism(3)
-    assert(3 == env.getStreamGraph.getStreamNode(parallelSource.getId).getParallelism)
-
-    map.setParallelism(2)
-    assert(2 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
-
-    sink.setParallelism(4)
-    assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
-  }
-
-  @Test
-  def testTypeInfo() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val src1: DataStream[Long] = env.generateSequence(0, 0)
-    assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
-
-    val map: DataStream[(Integer, String)] = src1.map(x => null)
-    assert(classOf[scala.Tuple2[Integer, String]] == map.getType().getTypeClass)
-
-    val window: DataStream[String] = map
-      .windowAll(GlobalWindows.create())
-      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
-      .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
-
-    assert(TypeExtractor.getForClass(classOf[String]) == window.getType)
-
-    val flatten: DataStream[Int] = window
-      .windowAll(GlobalWindows.create())
-      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
-      .fold(0, (accumulator: Int, value: String) => 0)
-    assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType())
-
-    // TODO check for custom case class
-  }
-
-  @Test def operatorTest() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val src = env.generateSequence(0, 0)
-
-    val mapFunction = new MapFunction[Long, Int] {
-      override def map(value: Long): Int = 0
-    }
-
-    val map = src.map(mapFunction)
-    assert(mapFunction == getFunctionForDataStream(map))
-    assert(getFunctionForDataStream(map.map(x => 0)).isInstanceOf[MapFunction[_, _]])
-    
-    val statefulMap2 = src.keyBy(x => x).mapWithState(
-        (in, state: Option[Long]) => (in, None.asInstanceOf[Option[Long]]))
-    
-    val flatMapFunction = new FlatMapFunction[Long, Int] {
-      override def flatMap(value: Long, out: Collector[Int]): Unit = {}
-    }
-    
-    val flatMap = src.flatMap(flatMapFunction)
-    assert(flatMapFunction == getFunctionForDataStream(flatMap))
-    assert(
-      getFunctionForDataStream(flatMap
-        .flatMap((x: Int, out: Collector[Int]) => {}))
-        .isInstanceOf[FlatMapFunction[_, _]])
-
-    val statefulfMap2 = src.keyBy(x => x).flatMapWithState(
-        (in, state: Option[Long]) => (List(in), None.asInstanceOf[Option[Long]]))
-   
-    val filterFunction = new FilterFunction[Int] {
-      override def filter(value: Int): Boolean = false
-    }
-
-    val unionFilter = map.union(flatMap).filter(filterFunction)
-    assert(filterFunction == getFunctionForDataStream(unionFilter))
-    assert(
-      getFunctionForDataStream(map
-        .filter((x: Int) => true))
-        .isInstanceOf[FilterFunction[_]])
-
-    val statefulFilter2 = src.keyBy( x => x).filterWithState[Long](
-        (in, state: Option[Long]) => (false, None))
-   
-    try {
-      env.getStreamGraph.getStreamEdge(map.getId, unionFilter.getId)
-    }
-    catch {
-      case e: Throwable => {
-        fail(e.getMessage)
-      }
-    }
-
-    try {
-      env.getStreamGraph.getStreamEdge(flatMap.getId, unionFilter.getId)
-    }
-    catch {
-      case e: Throwable => {
-        fail(e.getMessage)
-      }
-    }
-
-    val outputSelector = new OutputSelector[Int] {
-      override def select(value: Int): lang.Iterable[String] = null
-    }
-
-    val split = unionFilter.split(outputSelector)
-    split.print()
-    val outputSelectors = env.getStreamGraph.getStreamNode(unionFilter.getId).getOutputSelectors
-    assert(1 == outputSelectors.size)
-    assert(outputSelector == outputSelectors.get(0))
-
-    unionFilter.split(x => List("a")).print()
-    val moreOutputSelectors = env.getStreamGraph.getStreamNode(unionFilter.getId).getOutputSelectors
-    assert(2 == moreOutputSelectors.size)
-
-    val select = split.select("a")
-    val sink = select.print()
-    val splitEdge =
-      env.getStreamGraph.getStreamEdge(unionFilter.getId, sink.getTransformation.getId)
-    assert("a" == splitEdge.getSelectedNames.get(0))
-
-    val foldFunction = new FoldFunction[Int, String] {
-      override def fold(accumulator: String, value: Int): String = ""
-    }
-    val fold = map.keyBy(x=>x).fold("", foldFunction)
-    assert(foldFunction == getFunctionForDataStream(fold))
-    assert(
-      getFunctionForDataStream(map.keyBy(x=>x)
-        .fold("", (x: String, y: Int) => ""))
-        .isInstanceOf[FoldFunction[_, _]])
-
-    val connect = fold.connect(flatMap)
-
-    val coMapFunction =
-      new CoMapFunction[String, Int, String] {
-        override def map1(value: String): String = ""
-
-        override def map2(value: Int): String = ""
-      }
-    val coMap = connect.map(coMapFunction)
-    assert(coMapFunction == getFunctionForDataStream(coMap))
-
-    try {
-      env.getStreamGraph.getStreamEdge(fold.getId, coMap.getId)
-    }
-    catch {
-      case e: Throwable => {
-        fail(e.getMessage)
-      }
-    }
-    try {
-      env.getStreamGraph.getStreamEdge(flatMap.getId, coMap.getId)
-    }
-    catch {
-      case e: Throwable => {
-        fail(e.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def testChannelSelectors() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val src = env.generateSequence(0, 0)
-
-    val broadcast = src.broadcast
-    val broadcastSink = broadcast.print()
-    val broadcastPartitioner = env.getStreamGraph
-      .getStreamEdge(src.getId, broadcastSink.getTransformation.getId).getPartitioner
-    assert(broadcastPartitioner.isInstanceOf[BroadcastPartitioner[_]])
-
-    val shuffle: DataStream[Long] = src.shuffle
-    val shuffleSink = shuffle.print()
-    val shufflePartitioner = env.getStreamGraph
-      .getStreamEdge(src.getId, shuffleSink.getTransformation.getId).getPartitioner
-    assert(shufflePartitioner.isInstanceOf[ShufflePartitioner[_]])
-
-    val forward: DataStream[Long] = src.forward
-    val forwardSink = forward.print()
-    val forwardPartitioner = env.getStreamGraph
-      .getStreamEdge(src.getId, forwardSink.getTransformation.getId).getPartitioner
-    assert(forwardPartitioner.isInstanceOf[ForwardPartitioner[_]])
-
-    val rebalance: DataStream[Long] = src.rebalance
-    val rebalanceSink = rebalance.print()
-    val rebalancePartitioner = env.getStreamGraph
-      .getStreamEdge(src.getId, rebalanceSink.getTransformation.getId).getPartitioner
-    assert(rebalancePartitioner.isInstanceOf[RebalancePartitioner[_]])
-
-    val global: DataStream[Long] = src.global
-    val globalSink = global.print()
-    val globalPartitioner = env.getStreamGraph
-      .getStreamEdge(src.getId, globalSink.getTransformation.getId).getPartitioner
-    assert(globalPartitioner.isInstanceOf[GlobalPartitioner[_]])
-  }
-
-  @Test
-  def testIterations() {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    // we need to rebalance before iteration
-    val source = env.fromElements(1, 2, 3).map { t: Int => t }
-
-    val iterated = source.iterate((input: ConnectedStreams[Int, String]) => {
-      val head = input.map(i => (i + 1).toString, s => s)
-      (head.filter(_ == "2"), head.filter(_ != "2"))
-    }, 1000).print()
-
-    val iterated2 = source.iterate((input: DataStream[Int]) => 
-      (input.map(_ + 1), input.map(_.toString)), 2000)
-
-    try {
-      val invalid = source.iterate((input: ConnectedStreams[Int, String]) => {
-        val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
-        (head.filter(_ == "2"), head.filter(_ != "2"))
-      }, 1000).print()
-      fail()
-    } catch {
-      case uoe: UnsupportedOperationException =>
-      case e: Exception => fail()
-    }
-
-    val sg = env.getStreamGraph
-
-    assert(sg.getIterationSourceSinkPairs().size() == 2)
-  }
-
-  /////////////////////////////////////////////////////////////
-  // Utilities
-  /////////////////////////////////////////////////////////////
-
-  private def getFunctionForDataStream(dataStream: DataStream[_]): Function = {
-    dataStream.print()
-    val operator = getOperatorForDataStream(dataStream)
-      .asInstanceOf[AbstractUdfStreamOperator[_, _]]
-    operator.getUserFunction.asInstanceOf[Function]
-  }
-
-  private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = {
-    dataStream.print()
-    val env = dataStream.getJavaStream.getExecutionEnvironment
-    val streamGraph: StreamGraph = env.getStreamGraph
-    streamGraph.getStreamNode(dataStream.getId).getOperator
-  }
-
-  private def isPartitioned(edge: StreamEdge): Boolean = {
-    edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
-  }
-
-  private def isCustomPartitioned(edge: StreamEdge): Boolean = {
-    edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
-  }
-
-  private def createDownStreamId(dataStream: DataStream[_]): Integer = {
-    dataStream.print().getTransformation.getId
-  }
-
-  private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {
-    val m = dataStream.map(x => 0, x => 0)
-    m.print()
-    m.getId
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
deleted file mode 100644
index e09f164..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
-import org.apache.flink.test.util.MultipleProgramsTestBase
-
-import scala.language.existentials
-
-/**
- * Test programs for built in output formats. Invoked from {@link OutputFormatTest}.
- */
-object OutputFormatTestPrograms {
-
-  def wordCountToText(input : String, outputPath : String) : Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.fromElements(input)
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
-      .keyBy(0)
-      .sum(1)
-
-    counts.writeAsText(outputPath)
-
-    env.execute("Scala WordCountToText")
-  }
-
-  def wordCountToCsv(input : String, outputPath : String) : Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.fromElements(input)
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
-      .keyBy(0)
-      .sum(1)
-
-    counts.writeAsCsv(outputPath)
-
-    env.execute("Scala WordCountToCsv")
-  }
-
-  def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.fromElements(input)
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
-      .keyBy(0)
-      .sum(1)
-      .map(tuple => tuple.toString() + "\n")
-
-    counts.writeToSocket(outputHost, outputPort, new DummyStringSchema())
-
-    env.execute("Scala WordCountToCsv")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
deleted file mode 100644
index 3342e1e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.junit.JUnitSuiteLike
-
-trait ScalaStreamingMultipleProgramsTestBase
-  extends TestBaseUtils
-  with  JUnitSuiteLike
-  with BeforeAndAfterAll {
-
-  val parallelism = 4
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-
-  override protected def beforeAll(): Unit = {
-    val cluster = Some(
-      TestBaseUtils.startCluster(
-        1,
-        parallelism,
-        StreamingMode.STREAMING,
-        false,
-        false,
-        true
-      )
-    )
-
-    val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
-  }
-
-  override protected def afterAll(): Unit = {
-    cluster.foreach {
-      TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
deleted file mode 100644
index b2e05b3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.util
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-
-/**
- * Test programs for stateful functions.
- */
-object StateTestPrograms {
-
-  def testStatefulFunctions(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    
-    // test stateful map
-    env.generateSequence(0, 10).setParallelism(1)
-      .map { v => (1, v) }.setParallelism(1)
-      .keyBy(_._1)
-      .mapWithState((in, count: Option[Long]) =>
-        count match {
-          case Some(c) => (in._2 - c, Some(c + 1))
-          case None => (in._2, Some(1L))
-        }).setParallelism(1)
-      
-      .addSink(new RichSinkFunction[Long]() {
-        var allZero = true
-        override def invoke(in: Long) = {
-          if (in != 0) allZero = false
-        }
-        override def close() = {
-          assert(allZero)
-        }
-      })
-
-    // test stateful flatmap
-    env.fromElements((1, "First"), (2, "Second"), (1, "Hello world"))
-      .keyBy(_._1)
-      .flatMapWithState((w, s: Option[String]) =>
-        s match {
-          case Some(state) => (w._2.split(" ").toList.map(state + _), Some(w._2))
-          case None => (List(w._2), Some(w._2))
-        })
-      .setParallelism(1)
-      
-      .addSink(new RichSinkFunction[String]() {
-        val received = new util.HashSet[String]()
-        override def invoke(in: String) = { received.add(in) }
-        override def close() = {
-          assert(received.size() == 4)
-          assert(received.contains("First"))
-          assert(received.contains("Second"))
-          assert(received.contains("FirstHello"))
-          assert(received.contains("Firstworld"))
-        }
-      }).setParallelism(1)
-
-    // test stateful filter
-    env.generateSequence(1, 10).keyBy(_ % 2).filterWithState((in, state: Option[Int]) =>
-      state match {
-        case Some(s) => (s < 2, Some(s + 1))
-        case None => (true, Some(1))
-      }).addSink(new RichSinkFunction[Long]() {
-      var numOdd = 0
-      var numEven = 0
-      override def invoke(in: Long) = {
-        if (in % 2 == 0) { numEven += 1 } else { numOdd += 1 }
-      }
-      override def close() = {
-        assert(numOdd == 2)
-        assert(numEven == 2)
-      }
-    }).setParallelism(1)
-
-    env.execute("Stateful test")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
deleted file mode 100644
index 2131026..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.rules.TemporaryFolder
-import org.junit.{After, Before, Rule, Test}
-
-class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
-
-  var resultPath1: String = _
-  var resultPath2: String = _
-  var expected1: String = _
-  var expected2: String = _
-
-  val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder: TemporaryFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    val temp = tempFolder
-    resultPath1 = temp.newFile.toURI.toString
-    resultPath2 = temp.newFile.toURI.toString
-    expected1 = ""
-    expected2 = ""
-  }
-
-  @After
-  def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1)
-    TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2)
-  }
-
-  /** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created.
-    * The stream is grouped by the first field. For each group, the resulting stream is folded by
-    * summing up the second tuple field.
-    *
-    */
-  @Test
-  def testFoldOperator(): Unit = {
-    val numElements = 10
-    val numKeys = 2
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
-
-      override def run(ctx: SourceContext[(Int, Int)]): Unit = {
-        0 until numElements foreach {
-          i => ctx.collect((i % numKeys, i))
-        }
-      }
-
-      override def cancel(): Unit = {}
-    })
-
-    val splittedResult = sourceStream
-      .keyBy(0)
-      .fold(0, new FoldFunction[(Int, Int), Int] {
-        override def fold(accumulator: Int, value: (Int, Int)): Int = {
-          accumulator + value._2
-        }
-      })
-      .map(new RichMapFunction[Int, (Int, Int)] {
-        override def map(value: Int): (Int, Int) = {
-          (getRuntimeContext.getIndexOfThisSubtask, value)
-        }
-      })
-      .split{
-        x =>
-          Seq(x._1.toString)
-      }
-
-    splittedResult
-      .select("0")
-      .map(_._2)
-      .getJavaStream
-      .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
-    splittedResult
-      .select("1")
-      .map(_._2)
-      .getJavaStream
-      .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
-
-    val groupedSequence = 0 until numElements groupBy( _ % numKeys)
-
-    expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
-    expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
-
-    env.execute()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
deleted file mode 100644
index 101f3b5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.scala
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-
-import scala.language.existentials
-
-import org.junit.Test
-
-/**
- * This checks whether the streaming Scala API is up to feature parity with the Java API.
- * Implements the {@link ScalaAPICompletenessTest} for streaming.
- */
-class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
-
-  override def isExcludedByName(method: Method): Boolean = {
-    val name = method.getDeclaringClass.getName + "." + method.getName
-    val excludedNames = Seq(
-      // These are only used internally. Should be internal API but Java doesn't have
-      // private[flink].
-      "org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.DataStream.getType",
-      "org.apache.flink.streaming.api.datastream.DataStream.copy",
-      "org.apache.flink.streaming.api.datastream.DataStream.transform",
-      "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
-      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getFirstInput",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getSecondInput",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
-      "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
-
-      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
-      "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
-
-      "org.apache.flink.streaming.api.datastream.WindowedStream.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.WindowedStream.getInputType",
-      "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType",
-
-      "org.apache.flink.streaming.api.datastream.KeyedStream.transform",
-      "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector",
-
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." +
-        "getStateHandleProvider",
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointInterval",
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addOperator",
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getCheckpointingMode",
-      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment." +
-        "isForceCheckpointing",
-
-
-      // TypeHints are only needed for Java API, Scala API doesn't need them
-      "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns",
-
-      // Deactivated until Scala API has new windowing API
-      "org.apache.flink.streaming.api.datastream.DataStream.timeWindowAll",
-      "org.apache.flink.streaming.api.datastream.DataStream.windowAll"
-    )
-    val excludedPatterns = Seq(
-      // We don't have project on tuples in the Scala API
-      """^org\.apache\.flink\.streaming.api.*project""",
-
-      // Cleaning is easier in the Scala API
-      """^org\.apache\.flink\.streaming.api.*clean""",
-
-      // Object methods
-      """^.*notify""",
-      """^.*wait""",
-      """^.*notifyAll""",
-      """^.*equals""",
-      """^.*toString""",
-      """^.*getClass""",
-      """^.*hashCode"""
-    ).map(_.r)
-    lazy val excludedByPattern =
-      excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).nonEmpty
-    name.contains("$") || excludedNames.contains(name) || excludedByPattern
-  }
-
-  @Test
-  override def testCompleteness(): Unit = {
-    checkMethods("DataStream", "DataStream", classOf[JavaStream[_]], classOf[DataStream[_]])
-
-    checkMethods(
-      "StreamExecutionEnvironment", "StreamExecutionEnvironment",
-      classOf[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment],
-      classOf[StreamExecutionEnvironment])
-
-    checkMethods(
-      "SingleOutputStreamOperator", "DataStream",
-      classOf[org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[_,_]],
-      classOf[DataStream[_]])
-
-    checkMethods(
-      "ConnectedStreams", "ConnectedStreams",
-      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
-      classOf[ConnectedStreams[_,_]])
-
-    checkMethods(
-      "SplitStream", "SplitStream",
-      classOf[org.apache.flink.streaming.api.datastream.SplitStream[_]],
-      classOf[SplitStream[_]])
-
-    checkMethods(
-      "WindowedStream", "WindowedStream",
-      classOf[org.apache.flink.streaming.api.datastream.WindowedStream[_, _, _]],
-      classOf[WindowedStream[_, _, _]])
-
-    checkMethods(
-      "AllWindowedStream", "AllWindowedStream",
-      classOf[org.apache.flink.streaming.api.datastream.AllWindowedStream[_, _]],
-      classOf[AllWindowedStream[_, _]])
-
-    checkMethods(
-      "KeyedStream", "KeyedStream",
-      classOf[org.apache.flink.streaming.api.datastream.KeyedStream[_, _]],
-      classOf[KeyedStream[_, _]])
-
-    checkMethods(
-      "JoinedStreams.WithWindow", "JoinedStreams.WithWindow",
-      classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]],
-      classOf[JoinedStreams.WithWindow[_,_,_,_]])
-
-    checkMethods(
-      "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow",
-      classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]],
-      classOf[CoGroupedStreams.WithWindow[_,_,_,_]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
deleted file mode 100644
index d4e8bb2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.TimestampExtractor
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.mutable
-
-/**
- * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
- * work for windows, because FoldWindowFunction is OutputTypeConfigurable.
- */
-class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testFoldWindow(): Unit = {
-    WindowFoldITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
-        ctx.collect(("a", 0))
-        ctx.collect(("a", 1))
-        ctx.collect(("a", 2))
-        ctx.collect(("b", 3))
-        ctx.collect(("b", 4))
-        ctx.collect(("b", 5))
-        ctx.collect(("a", 6))
-        ctx.collect(("a", 7))
-        ctx.collect(("a", 8))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
-
-    source1
-      .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
-      .addSink(new SinkFunction[(String, Int)]() {
-        def invoke(value: (String, Int)) {
-        WindowFoldITCase.testResults += value.toString
-        }
-      })
-
-    env.execute("Fold Window Test")
-
-    val expectedResult = mutable.MutableList(
-      "(R:aaa,3)",
-      "(R:aaa,21)",
-      "(R:bbb,12)")
-
-    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFoldAllWindow(): Unit = {
-    WindowFoldITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val source1 = env.addSource(new SourceFunction[(String, Int)]() {
-      def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
-        ctx.collect(("a", 0))
-        ctx.collect(("a", 1))
-        ctx.collect(("a", 2))
-        ctx.collect(("b", 3))
-        ctx.collect(("a", 3))
-        ctx.collect(("b", 4))
-        ctx.collect(("a", 4))
-        ctx.collect(("b", 5))
-        ctx.collect(("a", 5))
-      }
-
-      def cancel() {
-      }
-    }).assignTimestamps(new WindowFoldITCase.Tuple2TimestampExtractor)
-
-    source1
-      .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-      .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) })
-      .addSink(new SinkFunction[(String, Int)]() {
-      def invoke(value: (String, Int)) {
-        WindowFoldITCase.testResults += value.toString
-      }
-    })
-
-    env.execute("Fold All-Window Test")
-
-    val expectedResult = mutable.MutableList(
-      "(R:aaa,3)",
-      "(R:bababa,24)")
-
-    assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
-  }
-
-}
-
-
-object WindowFoldITCase {
-  private var testResults: mutable.MutableList[String] = null
-
-  private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] {
-    def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2
-    }
-
-    def extractWatermark(element: (String, Int), currentTimestamp: Long): Long = {
-      element._2 - 1
-    }
-
-    def getCurrentWatermark: Long = {
-      Long.MinValue
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
deleted file mode 100644
index 46981ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
-import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
-import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.util.Collector
-
-import org.junit.Assert._
-import org.junit.Test
-
-class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
-  /**
-   * These tests ensure that the fast aligned time windows operator is used if the
-   * conditions are right.
-   */
-  @Test
-  def testFastTimeWindows(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-        .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-    
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[AggregatingProcessingTimeWindowOperator[_, _]])
-
-    val window2 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-            key: Tuple,
-            window: TimeWindow,
-            values: java.lang.Iterable[(String, Int)],
-            out: Collector[(String, Int)]) { }
-      })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[AccumulatingProcessingTimeWindowOperator[_, _, _]])
-  }
-
-  @Test
-  def testNonEvicting(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(
-      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-      def apply(
-                    tuple: Tuple,
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-  }
-
-  @Test
-  def testEvicting(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
-      .reduce(reducer)
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger])
-    assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .evictor(CountEvictor.of(1000))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-      def apply(
-                    tuple: Tuple,
-                    window: TimeWindow,
-                    values: java.lang.Iterable[(String, Int)],
-                    out: Collector[(String, Int)]) { }
-    })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[EvictingWindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
-  }
-
-  @Test
-  def testPreReduce(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val source = env.fromElements(("hello", 1), ("hello", 2))
-
-    val reducer = new DummyReducer
-
-    val window1 = source
-      .keyBy(0)
-      .window(SlidingTimeWindows.of(
-        Time.of(1, TimeUnit.SECONDS),
-        Time.of(100, TimeUnit.MILLISECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform1 = window1.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator1 = transform1.getOperator
-
-    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
-    assertTrue(
-      winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-
-
-    val window2 = source
-      .keyBy(0)
-      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      .trigger(CountTrigger.of(100))
-      .apply(reducer, new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
-        def apply(
-                   tuple: Tuple,
-                   window: TimeWindow,
-                   values: java.lang.Iterable[(String, Int)],
-                   out: Collector[(String, Int)]) { }
-      })
-
-    val transform2 = window2.getJavaStream.getTransformation
-      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
-
-    val operator2 = transform2.getOperator
-
-    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
-    val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
-    assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
-    assertTrue(
-      winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/pom.xml b/flink-staging/flink-streaming/pom.xml
deleted file mode 100644
index aa233c8..0000000
--- a/flink-staging/flink-streaming/pom.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-parent</artifactId>
-	<name>flink-streaming</name>
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-streaming-core</module>
-		<module>flink-streaming-scala</module>
-		<module>flink-streaming-examples</module>
-		<module>flink-streaming-connectors</module>
-	</modules>
-	
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 7bc76a7..8483bfa 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -38,7 +38,6 @@ under the License.
 		<module>flink-avro</module>
 		<module>flink-jdbc</module>
 		<module>flink-hadoop-compatibility</module>
-		<module>flink-streaming</module>
 		<module>flink-hbase</module>
 		<module>flink-hcatalog</module>
 		<module>flink-table</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
new file mode 100644
index 0000000..f3efe2e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-elasticsearch</artifactId>
+	<name>flink-connector-elasticsearch</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<elasticsearch.version>1.7.1</elasticsearch.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+				<configuration>
+					<rerunFailingTestsCount>3</rerunFailingTestsCount>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
new file mode 100644
index 0000000..546ec8d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSink.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.collect.ImmutableList;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+
+/**
+ * Sink that emits its input elements to an Elasticsearch cluster.
+ *
+ * <p>
+ * When using the first constructor {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)}
+ * the sink will create a local {@link Node} for communicating with the
+ * Elasticsearch cluster. When using the second constructor
+ * {@link #ElasticsearchSink(java.util.Map, IndexRequestBuilder)} a {@link TransportClient} will
+ * be used instead.
+ *
+ * <p>
+ * <b>Attention: </b> When using the {@code TransportClient} the sink will fail if no cluster
+ * can be connected to. With the {@code Node Client} the sink will block and wait for a cluster
+ * to come online.
+ *
+ * <p>
+ * The {@link Map} passed to the constructor is forwarded to Elasticsearch when creating
+ * the {@link Node} or {@link TransportClient}. The config keys can be found in the Elasticsearch
+ * documentation. An important setting is {@code cluster.name}, this should be set to the name
+ * of the cluster that the sink should emit to.
+ *
+ * <p>
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link IndexRequest IndexRequests}.
+ * This will buffer elements before sending a request to the cluster. The behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * <ul>
+ *   <li> {@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ *   <li> {@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ *   <li> {@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ *   settings in milliseconds
+ * </ul>
+ *
+ * <p>
+ * You also have to provide an {@link IndexRequestBuilder}. This is used to create an
+ * {@link IndexRequest} from an element that needs to be added to Elasticsearch. See
+ * {@link org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder} for an example.
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class ElasticsearchSink<T> extends RichSinkFunction<T> {
+
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+	public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+	public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class);
+
+	/**
+	 * The user specified config map that we forward to Elasticsearch when we create the Client.
+	 */
+	private final Map<String, String> userConfig;
+
+	/**
+	 * The list of nodes that the TransportClient should connect to. This is null if we are using
+	 * an embedded Node to get a Client.
+	 */
+	private final List<TransportAddress> transportNodes;
+
+	/**
+	 * The builder that is used to construct an {@link IndexRequest} from the incoming element.
+	 */
+	private final IndexRequestBuilder<T> indexRequestBuilder;
+
+	/**
+	 * The embedded Node that is used to communicate with the Elasticsearch cluster. This is null
+	 * if we are using a TransportClient.
+	 */
+	private transient Node node;
+
+	/**
+	 * The Client that was either retrieved from a Node or is a TransportClient.
+	 */
+	private transient Client client;
+
+	/**
+	 * Bulk processor that was created using the client
+	 */
+	private transient BulkProcessor bulkProcessor;
+
+	/**
+	 * This is set from inside the BulkProcessor listener if there where failures in processing.
+	 */
+	private final AtomicBoolean hasFailure = new AtomicBoolean(false);
+
+	/**
+	 * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing.
+	 */
+	private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+	/**
+	 * Creates a new ElasticsearchSink that connects to the cluster using an embedded Node.
+	 *
+	 * @param userConfig The map of user settings that are passed when constructing the Node and BulkProcessor
+	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) {
+		this.userConfig = userConfig;
+		this.indexRequestBuilder = indexRequestBuilder;
+		transportNodes = null;
+	}
+
+	/**
+	 * Creates a new ElasticsearchSink that connects to the cluster using a TransportClient.
+	 *
+	 * @param userConfig The map of user settings that are passed when constructing the TransportClient and BulkProcessor
+	 * @param transportNodes The Elasticsearch Nodes to which to connect using a {@code TransportClient}
+	 * @param indexRequestBuilder This is used to generate the IndexRequest from the incoming element
+	 *
+	 */
+	public ElasticsearchSink(Map<String, String> userConfig, List<TransportAddress> transportNodes, IndexRequestBuilder<T> indexRequestBuilder) {
+		this.userConfig = userConfig;
+		this.indexRequestBuilder = indexRequestBuilder;
+		this.transportNodes = transportNodes;
+	}
+
+	/**
+	 * Initializes the connection to Elasticsearch by either creating an embedded
+	 * {@link org.elasticsearch.node.Node} and retrieving the
+	 * {@link org.elasticsearch.client.Client} from it or by creating a
+	 * {@link org.elasticsearch.client.transport.TransportClient}.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+		if (transportNodes == null) {
+			// Make sure that we disable http access to our embedded node
+			Settings settings =
+					ImmutableSettings.settingsBuilder()
+							.put(userConfig)
+							.put("http.enabled", false)
+							.build();
+
+			node =
+					nodeBuilder()
+							.settings(settings)
+							.client(true)
+							.data(false)
+							.node();
+
+			client = node.client();
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch Client {} from embedded Node", client);
+			}
+
+		} else {
+			Settings settings = ImmutableSettings.settingsBuilder()
+					.put(userConfig)
+					.build();
+
+			TransportClient transportClient = new TransportClient(settings);
+			for (TransportAddress transport: transportNodes) {
+				transportClient.addTransportAddress(transport);
+			}
+
+			// verify that we actually are connected to a cluster
+			ImmutableList<DiscoveryNode> nodes = transportClient.connectedNodes();
+			if (nodes.isEmpty()) {
+				throw new RuntimeException("Client is not connected to any Elasticsearch nodes!");
+			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Connected to nodes: " + nodes.toString());
+				}
+			}
+
+			client = transportClient;
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Created Elasticsearch TransportClient {}", client);
+			}
+		}
+
+		BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+				client,
+				new BulkProcessor.Listener() {
+					@Override
+					public void beforeBulk(long executionId,
+							BulkRequest request) {
+
+					}
+
+					@Override
+					public void afterBulk(long executionId,
+							BulkRequest request,
+							BulkResponse response) {
+						if (response.hasFailures()) {
+							for (BulkItemResponse itemResp : response.getItems()) {
+								if (itemResp.isFailed()) {
+									LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+									failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
+								}
+							}
+							hasFailure.set(true);
+						}
+					}
+
+					@Override
+					public void afterBulk(long executionId,
+							BulkRequest request,
+							Throwable failure) {
+						LOG.error(failure.getMessage());
+						failureThrowable.compareAndSet(null, failure);
+						hasFailure.set(true);
+					}
+				});
+
+		// This makes flush() blocking
+		bulkProcessorBuilder.setConcurrentRequests(0);
+
+		ParameterTool params = ParameterTool.fromMap(userConfig);
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+			bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+			bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt(
+					CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB));
+		}
+
+		if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+			bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)));
+		}
+
+		bulkProcessor = bulkProcessorBuilder.build();
+	}
+
+	@Override
+	public void invoke(T element) {
+		IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext());
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Emitting IndexRequest: {}", indexRequest);
+		}
+
+		bulkProcessor.add(indexRequest);
+	}
+
+	@Override
+	public void close() {
+		if (bulkProcessor != null) {
+			bulkProcessor.close();
+			bulkProcessor = null;
+		}
+
+		if (client != null) {
+			client.close();
+		}
+
+		if (node != null) {
+			node.close();
+		}
+
+		if (hasFailure.get()) {
+			Throwable cause = failureThrowable.get();
+			if (cause != null) {
+				throw new RuntimeException("An error occured in ElasticsearchSink.", cause);
+			} else {
+				throw new RuntimeException("An error occured in ElasticsearchSink.");
+
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
new file mode 100644
index 0000000..04ae40a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/IndexRequestBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.elasticsearch.action.index.IndexRequest;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates an {@link IndexRequest} from an element in a Stream.
+ *
+ * <p>
+ * This is used by {@link org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink}
+ * to prepare elements for sending them to Elasticsearch. See
+ * <a href="https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index_.html">Index API</a>
+ * for information about how to format data for adding it to an Elasticsearch index.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     private static class MyIndexRequestBuilder implements IndexRequestBuilder<String> {
+ *
+ *         public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+ *             Map<String, Object> json = new HashMap<>();
+ *             json.put("data", element);
+ *
+ *             return Requests.indexRequest()
+ *                 .index("my-index")
+ *                 .type("my-type")
+ *                 .source(json);
+ *         }
+ *     }
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@code IndexRequestBuilder}
+ */
+public interface IndexRequestBuilder<T> extends Function, Serializable {
+
+	/**
+	 * Creates an {@link org.elasticsearch.action.index.IndexRequest} from an element.
+	 *
+	 * @param element The element that needs to be turned in to an {@code IndexRequest}
+	 * @param ctx The Flink {@link RuntimeContext} of the {@link ElasticsearchSink}
+	 *
+	 * @return The constructed {@code IndexRequest}
+	 */
+	IndexRequest createIndexRequest(T element, RuntimeContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
new file mode 100644
index 0000000..298eb64
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/examples/ElasticsearchExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.examples;
+
+import com.google.common.collect.Maps;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch.IndexRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that
+ * you have a cluster names "elasticsearch" running or change the cluster name in the config map.
+ */
+public class ElasticsearchExample {
+
+	public static void main(String[] args) throws Exception {
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {
+			private static final long serialVersionUID = 1L;
+
+			private volatile boolean running = true;
+
+			@Override
+			public void run(SourceContext<String> ctx) throws Exception {
+				for (int i = 0; i < 20 && running; i++) {
+					ctx.collect("message #" + i);
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		Map<String, String> config = Maps.newHashMap();
+		// This instructs the sink to emit after every element, otherwise they would be buffered
+		config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+		source.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
+			@Override
+			public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
+				Map<String, Object> json = new HashMap<>();
+				json.put("data", element);
+
+				return Requests.indexRequest()
+						.index("my-index")
+						.type("my-type")
+						.source(json);
+			}
+		}));
+
+
+		env.execute("Elasticsearch Example");
+	}
+}