You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:39:00 UTC

[19/19] flink git commit: [streaming] Major internal renaming and restructure

[streaming] Major internal renaming and restructure

Closes #594


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4754a97b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4754a97b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4754a97b

Branch: refs/heads/master
Commit: 4754a97b19c96647886a46131750db26a0b5f618
Parents: f1b445b
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 13 20:46:33 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 15 11:35:12 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/ConnectorSource.java   |   4 +-
 .../streaming/connectors/flume/FlumeSink.java   |   2 +-
 .../connectors/kafka/KafkaProducerExample.java  |   2 +-
 .../connectors/kafka/api/KafkaSink.java         |   2 +-
 .../kafka/api/simple/PersistentKafkaSource.java |   3 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   2 +-
 .../connectors/twitter/TwitterSource.java       |   4 +-
 .../connectors/twitter/TwitterStreaming.java    |   2 +-
 .../streaming/connectors/kafka/KafkaITCase.java |   4 +-
 .../flink/streaming/api/JSONGenerator.java      | 180 --------
 .../flink/streaming/api/StreamConfig.java       | 392 ----------------
 .../apache/flink/streaming/api/StreamEdge.java  | 120 -----
 .../apache/flink/streaming/api/StreamGraph.java | 458 -------------------
 .../apache/flink/streaming/api/StreamNode.java  | 201 --------
 .../api/StreamingJobGraphGenerator.java         | 346 --------------
 .../flink/streaming/api/WindowingOptimizer.java | 151 ------
 .../api/collector/CollectorWrapper.java         |   2 +-
 .../streaming/api/collector/StreamOutput.java   |   7 +-
 .../BroadcastOutputSelectorWrapper.java         |   2 +-
 .../selector/DirectedOutputSelectorWrapper.java |   2 +-
 .../selector/OutputSelectorWrapper.java         |   2 +-
 .../api/datastream/ConnectedDataStream.java     |  52 +--
 .../streaming/api/datastream/DataStream.java    |  82 ++--
 .../api/datastream/DataStreamSink.java          |   6 +-
 .../api/datastream/DataStreamSource.java        |   8 +-
 .../api/datastream/DiscretizedStream.java       |  44 +-
 .../api/datastream/GroupedDataStream.java       |  16 +-
 .../api/datastream/IterativeDataStream.java     |   6 +-
 .../datastream/SingleOutputStreamOperator.java  |  14 +-
 .../api/datastream/StreamProjection.java        |  52 +--
 .../api/datastream/WindowedDataStream.java      |  38 +-
 .../temporal/StreamCrossOperator.java           | 112 +++++
 .../datastream/temporal/StreamJoinOperator.java | 273 +++++++++++
 .../datastream/temporal/TemporalOperator.java   | 124 +++++
 .../api/datastream/temporal/TemporalWindow.java |  45 ++
 .../temporaloperator/StreamCrossOperator.java   | 112 -----
 .../temporaloperator/StreamJoinOperator.java    | 273 -----------
 .../temporaloperator/TemporalOperator.java      | 124 -----
 .../temporaloperator/TemporalWindow.java        |  45 --
 .../environment/StreamExecutionEnvironment.java |  32 +-
 .../api/function/RichWindowMapFunction.java     |  40 --
 .../api/function/WindowMapFunction.java         |  35 --
 .../aggregation/AggregationFunction.java        |  35 --
 .../aggregation/ComparableAggregator.java       | 238 ----------
 .../api/function/aggregation/Comparator.java    | 104 -----
 .../api/function/aggregation/SumAggregator.java | 173 -------
 .../api/function/aggregation/SumFunction.java   | 102 -----
 .../api/function/co/CoFlatMapFunction.java      |  42 --
 .../api/function/co/CoMapFunction.java          |  41 --
 .../api/function/co/CoReduceFunction.java       | 107 -----
 .../api/function/co/CoWindowFunction.java       |  30 --
 .../api/function/co/CrossWindowFunction.java    |  44 --
 .../api/function/co/JoinWindowFunction.java     |  77 ----
 .../api/function/co/RichCoFlatMapFunction.java  |  40 --
 .../api/function/co/RichCoMapFunction.java      |  40 --
 .../api/function/co/RichCoReduceFunction.java   |  40 --
 .../api/function/co/RichCoWindowFunction.java   |  34 --
 .../api/function/sink/FileSinkFunction.java     | 118 -----
 .../function/sink/FileSinkFunctionByMillis.java |  59 ---
 .../api/function/sink/PrintSinkFunction.java    |  96 ----
 .../api/function/sink/RichSinkFunction.java     |  28 --
 .../api/function/sink/SinkFunction.java         |  38 --
 .../api/function/sink/SocketClientSink.java     | 130 ------
 .../api/function/sink/WriteFormat.java          |  43 --
 .../api/function/sink/WriteFormatAsCsv.java     |  49 --
 .../api/function/sink/WriteFormatAsText.java    |  47 --
 .../api/function/sink/WriteSinkFunction.java    |  92 ----
 .../sink/WriteSinkFunctionByMillis.java         |  50 --
 .../function/source/FileMonitoringFunction.java | 133 ------
 .../api/function/source/FileReadFunction.java   |  51 ---
 .../api/function/source/FileSourceFunction.java | 135 ------
 .../function/source/FromElementsFunction.java   |  61 ---
 .../function/source/GenSequenceFunction.java    |  61 ---
 .../function/source/GenericSourceFunction.java  |  25 -
 .../function/source/ParallelSourceFunction.java |  34 --
 .../source/RichParallelSourceFunction.java      |  33 --
 .../api/function/source/RichSourceFunction.java |  46 --
 .../source/SocketTextStreamFunction.java        | 148 ------
 .../api/function/source/SourceFunction.java     |  56 ---
 .../api/functions/RichWindowMapFunction.java    |  40 ++
 .../api/functions/WindowMapFunction.java        |  35 ++
 .../aggregation/AggregationFunction.java        |  35 ++
 .../aggregation/ComparableAggregator.java       | 238 ++++++++++
 .../api/functions/aggregation/Comparator.java   | 104 +++++
 .../functions/aggregation/SumAggregator.java    | 173 +++++++
 .../api/functions/aggregation/SumFunction.java  | 102 +++++
 .../api/functions/co/CoFlatMapFunction.java     |  42 ++
 .../api/functions/co/CoMapFunction.java         |  41 ++
 .../api/functions/co/CoReduceFunction.java      | 107 +++++
 .../api/functions/co/CoWindowFunction.java      |  30 ++
 .../api/functions/co/CrossWindowFunction.java   |  44 ++
 .../api/functions/co/JoinWindowFunction.java    |  77 ++++
 .../api/functions/co/RichCoFlatMapFunction.java |  40 ++
 .../api/functions/co/RichCoMapFunction.java     |  40 ++
 .../api/functions/co/RichCoReduceFunction.java  |  40 ++
 .../api/functions/co/RichCoWindowFunction.java  |  34 ++
 .../api/functions/sink/FileSinkFunction.java    | 118 +++++
 .../sink/FileSinkFunctionByMillis.java          |  59 +++
 .../api/functions/sink/PrintSinkFunction.java   |  96 ++++
 .../api/functions/sink/RichSinkFunction.java    |  28 ++
 .../api/functions/sink/SinkFunction.java        |  38 ++
 .../api/functions/sink/SocketClientSink.java    | 130 ++++++
 .../api/functions/sink/WriteFormat.java         |  43 ++
 .../api/functions/sink/WriteFormatAsCsv.java    |  49 ++
 .../api/functions/sink/WriteFormatAsText.java   |  47 ++
 .../api/functions/sink/WriteSinkFunction.java   |  92 ++++
 .../sink/WriteSinkFunctionByMillis.java         |  50 ++
 .../source/FileMonitoringFunction.java          | 133 ++++++
 .../api/functions/source/FileReadFunction.java  |  51 +++
 .../functions/source/FileSourceFunction.java    | 135 ++++++
 .../functions/source/FromElementsFunction.java  |  61 +++
 .../functions/source/GenSequenceFunction.java   |  61 +++
 .../functions/source/GenericSourceFunction.java |  25 +
 .../source/ParallelSourceFunction.java          |  34 ++
 .../source/RichParallelSourceFunction.java      |  33 ++
 .../functions/source/RichSourceFunction.java    |  46 ++
 .../source/SocketTextStreamFunction.java        | 148 ++++++
 .../api/functions/source/SourceFunction.java    |  56 +++
 .../streaming/api/graph/JSONGenerator.java      | 180 ++++++++
 .../flink/streaming/api/graph/StreamConfig.java | 392 ++++++++++++++++
 .../flink/streaming/api/graph/StreamEdge.java   | 120 +++++
 .../flink/streaming/api/graph/StreamGraph.java  | 458 +++++++++++++++++++
 .../flink/streaming/api/graph/StreamNode.java   | 201 ++++++++
 .../api/graph/StreamingJobGraphGenerator.java   | 346 ++++++++++++++
 .../streaming/api/graph/WindowingOptimizer.java | 151 ++++++
 .../api/invokable/ChainableInvokable.java       |  57 ---
 .../streaming/api/invokable/SinkInvokable.java  |  43 --
 .../api/invokable/SourceInvokable.java          |  50 --
 .../api/invokable/StreamInvokable.java          | 209 ---------
 .../invokable/operator/CounterInvokable.java    |  45 --
 .../api/invokable/operator/FilterInvokable.java |  49 --
 .../invokable/operator/FlatMapInvokable.java    |  45 --
 .../operator/GroupedFoldInvokable.java          |  57 ---
 .../operator/GroupedReduceInvokable.java        |  52 ---
 .../api/invokable/operator/MapInvokable.java    |  45 --
 .../invokable/operator/ProjectInvokable.java    |  64 ---
 .../invokable/operator/StreamFoldInvokable.java |  54 ---
 .../operator/StreamReduceInvokable.java         |  55 ---
 .../operator/co/CoFlatMapInvokable.java         |  54 ---
 .../operator/co/CoGroupedReduceInvokable.java   |  88 ----
 .../api/invokable/operator/co/CoInvokable.java  | 155 -------
 .../invokable/operator/co/CoMapInvokable.java   |  54 ---
 .../operator/co/CoReduceInvokable.java          |  70 ---
 .../operator/co/CoWindowInvokable.java          | 197 --------
 .../operator/windowing/EmptyWindowFilter.java   |  32 --
 .../windowing/GroupedActiveDiscretizer.java     | 116 -----
 .../windowing/GroupedStreamDiscretizer.java     | 128 ------
 .../windowing/GroupedWindowBufferInvokable.java |  73 ---
 .../windowing/ParallelGroupedMerge.java         |  41 --
 .../operator/windowing/ParallelMerge.java       | 142 ------
 .../operator/windowing/StreamDiscretizer.java   | 223 ---------
 .../windowing/WindowBufferInvokable.java        |  67 ---
 .../operator/windowing/WindowFlattener.java     |  50 --
 .../operator/windowing/WindowFolder.java        |  70 ---
 .../operator/windowing/WindowMapper.java        |  64 ---
 .../operator/windowing/WindowMerger.java        |  70 ---
 .../operator/windowing/WindowPartExtractor.java |  55 ---
 .../operator/windowing/WindowPartitioner.java   |  74 ---
 .../operator/windowing/WindowReducer.java       |  69 ---
 .../api/operators/ChainableStreamOperator.java  |  57 +++
 .../streaming/api/operators/StreamCounter.java  |  44 ++
 .../streaming/api/operators/StreamFilter.java   |  48 ++
 .../streaming/api/operators/StreamFlatMap.java  |  44 ++
 .../streaming/api/operators/StreamFold.java     |  53 +++
 .../api/operators/StreamGroupedFold.java        |  57 +++
 .../api/operators/StreamGroupedReduce.java      |  52 +++
 .../streaming/api/operators/StreamMap.java      |  44 ++
 .../streaming/api/operators/StreamOperator.java | 209 +++++++++
 .../streaming/api/operators/StreamProject.java  |  63 +++
 .../streaming/api/operators/StreamReduce.java   |  54 +++
 .../streaming/api/operators/StreamSink.java     |  43 ++
 .../streaming/api/operators/StreamSource.java   |  50 ++
 .../api/operators/co/CoStreamFlatMap.java       |  54 +++
 .../api/operators/co/CoStreamGroupedReduce.java |  88 ++++
 .../streaming/api/operators/co/CoStreamMap.java |  54 +++
 .../api/operators/co/CoStreamOperator.java      | 155 +++++++
 .../api/operators/co/CoStreamReduce.java        |  70 +++
 .../api/operators/co/CoStreamWindow.java        | 197 ++++++++
 .../operators/windowing/EmptyWindowFilter.java  |  32 ++
 .../windowing/GroupedActiveDiscretizer.java     | 116 +++++
 .../windowing/GroupedStreamDiscretizer.java     | 128 ++++++
 .../windowing/GroupedWindowBuffer.java          |  73 +++
 .../windowing/ParallelGroupedMerge.java         |  41 ++
 .../api/operators/windowing/ParallelMerge.java  | 142 ++++++
 .../operators/windowing/StreamDiscretizer.java  | 223 +++++++++
 .../operators/windowing/StreamWindowBuffer.java |  67 +++
 .../operators/windowing/WindowFlattener.java    |  50 ++
 .../api/operators/windowing/WindowFolder.java   |  70 +++
 .../api/operators/windowing/WindowMapper.java   |  64 +++
 .../api/operators/windowing/WindowMerger.java   |  70 +++
 .../windowing/WindowPartExtractor.java          |  55 +++
 .../operators/windowing/WindowPartitioner.java  |  74 +++
 .../api/operators/windowing/WindowReducer.java  |  69 +++
 .../streaming/api/state/CircularFifoList.java   | 112 +++++
 .../api/state/NullableCircularBuffer.java       | 362 +++++++++++++++
 .../streaming/api/state/PartitionableState.java |  66 +++
 .../api/streamrecord/StreamRecord.java          | 134 ------
 .../streamrecord/StreamRecordSerializer.java    | 115 -----
 .../flink/streaming/api/streamrecord/UID.java   | 122 -----
 .../api/streamvertex/CoStreamVertex.java        | 143 ------
 .../api/streamvertex/InputHandler.java          |  94 ----
 .../api/streamvertex/OutputHandler.java         | 213 ---------
 .../api/streamvertex/StreamIterationHead.java   | 110 -----
 .../api/streamvertex/StreamIterationTail.java   | 114 -----
 .../api/streamvertex/StreamTaskContext.java     |  46 --
 .../api/streamvertex/StreamVertex.java          | 326 -------------
 .../api/streamvertex/StreamVertexException.java |  68 ---
 .../streamvertex/StreamingRuntimeContext.java   | 120 -----
 .../api/streamvertex/StreamingSuperstep.java    |  60 ---
 .../windowing/policy/ActiveTriggerPolicy.java   |  10 +-
 .../policy/CloneableEvictionPolicy.java         |   2 +-
 .../policy/CloneableTriggerPolicy.java          |   2 +-
 .../api/windowing/policy/EvictionPolicy.java    |   2 +-
 .../flink/streaming/io/BarrierBuffer.java       | 279 -----------
 .../flink/streaming/io/BlockingQueueBroker.java |  41 --
 .../flink/streaming/io/BufferSpiller.java       |  91 ----
 .../flink/streaming/io/CoReaderIterator.java    |  57 ---
 .../flink/streaming/io/CoRecordReader.java      | 289 ------------
 .../streaming/io/IndexedMutableReader.java      |  37 --
 .../streaming/io/IndexedReaderIterator.java     |  33 --
 .../flink/streaming/io/InputGateFactory.java    |  42 --
 .../flink/streaming/io/RecordWriterFactory.java |  52 ---
 .../apache/flink/streaming/io/SpillReader.java  |  78 ----
 .../streaming/io/SpillingBufferOrEvent.java     |  66 ---
 .../flink/streaming/io/StreamRecordWriter.java  |  88 ----
 .../io/StreamingAbstractRecordReader.java       | 133 ------
 .../io/StreamingMutableRecordReader.java        |  43 --
 .../flink/streaming/io/StreamingReader.java     |  27 --
 .../partitioner/BroadcastPartitioner.java       |  55 ---
 .../partitioner/DistributePartitioner.java      |  46 --
 .../partitioner/FieldsPartitioner.java          |  50 --
 .../partitioner/GlobalPartitioner.java          |  38 --
 .../partitioner/ShufflePartitioner.java         |  49 --
 .../partitioner/StreamPartitioner.java          |  44 --
 .../streaming/runtime/io/BarrierBuffer.java     | 279 +++++++++++
 .../runtime/io/BlockingQueueBroker.java         |  41 ++
 .../streaming/runtime/io/BufferSpiller.java     |  91 ++++
 .../streaming/runtime/io/CoReaderIterator.java  |  57 +++
 .../streaming/runtime/io/CoRecordReader.java    | 289 ++++++++++++
 .../runtime/io/IndexedMutableReader.java        |  37 ++
 .../runtime/io/IndexedReaderIterator.java       |  33 ++
 .../streaming/runtime/io/InputGateFactory.java  |  42 ++
 .../runtime/io/RecordWriterFactory.java         |  52 +++
 .../flink/streaming/runtime/io/SpillReader.java |  78 ++++
 .../runtime/io/SpillingBufferOrEvent.java       |  66 +++
 .../runtime/io/StreamRecordWriter.java          |  88 ++++
 .../io/StreamingAbstractRecordReader.java       | 133 ++++++
 .../io/StreamingMutableRecordReader.java        |  43 ++
 .../streaming/runtime/io/StreamingReader.java   |  27 ++
 .../partitioner/BroadcastPartitioner.java       |  55 +++
 .../partitioner/DistributePartitioner.java      |  46 ++
 .../runtime/partitioner/FieldsPartitioner.java  |  50 ++
 .../runtime/partitioner/GlobalPartitioner.java  |  38 ++
 .../runtime/partitioner/ShufflePartitioner.java |  49 ++
 .../runtime/partitioner/StreamPartitioner.java  |  44 ++
 .../runtime/streamrecord/StreamRecord.java      | 103 +++++
 .../streamrecord/StreamRecordSerializer.java    | 110 +++++
 .../streaming/runtime/tasks/CoStreamTask.java   | 143 ++++++
 .../streaming/runtime/tasks/InputHandler.java   |  94 ++++
 .../streaming/runtime/tasks/OutputHandler.java  | 213 +++++++++
 .../runtime/tasks/StreamIterationHead.java      | 110 +++++
 .../runtime/tasks/StreamIterationTail.java      | 114 +++++
 .../streaming/runtime/tasks/StreamTask.java     | 326 +++++++++++++
 .../runtime/tasks/StreamTaskContext.java        |  46 ++
 .../runtime/tasks/StreamTaskException.java      |  68 +++
 .../runtime/tasks/StreamingRuntimeContext.java  | 120 +++++
 .../runtime/tasks/StreamingSuperstep.java       |  60 +++
 .../flink/streaming/state/CircularFifoList.java | 112 -----
 .../streaming/state/NullableCircularBuffer.java | 362 ---------------
 .../streaming/state/PartitionableState.java     |  66 ---
 .../streaming/api/AggregationFunctionTest.java  |  62 +--
 .../flink/streaming/api/CoStreamTest.java       |   6 +-
 .../apache/flink/streaming/api/IterateTest.java |   2 +-
 .../apache/flink/streaming/api/SourceTest.java  |   4 +-
 .../flink/streaming/api/TypeFillTest.java       |   8 +-
 .../api/collector/DirectedOutputTest.java       |   2 +-
 .../api/collector/StreamCollectorTest.java      |   4 +-
 .../operator/CounterInvokableTest.java          |  39 --
 .../api/invokable/operator/FilterTest.java      |  51 ---
 .../api/invokable/operator/FlatMapTest.java     |  54 ---
 .../operator/GroupedFoldInvokableTest.java      |  67 ---
 .../operator/GroupedReduceInvokableTest.java    |  62 ---
 .../api/invokable/operator/MapTest.java         |  49 --
 .../api/invokable/operator/ProjectTest.java     |  67 ---
 .../api/invokable/operator/StreamFoldTest.java  |  56 ---
 .../invokable/operator/StreamReduceTest.java    |  54 ---
 .../invokable/operator/co/CoFlatMapTest.java    |  84 ----
 .../operator/co/CoGroupedReduceTest.java        | 125 -----
 .../api/invokable/operator/co/CoMapTest.java    |  57 ---
 .../operator/co/CoStreamReduceTest.java         |  71 ---
 .../api/invokable/operator/co/CoWindowTest.java | 182 --------
 .../operator/co/SelfConnectionTest.java         | 252 ----------
 .../windowing/GroupedStreamDiscretizerTest.java | 101 ----
 .../operator/windowing/ParallelMergeTest.java   | 119 -----
 .../windowing/StreamDiscretizerTest.java        | 117 -----
 .../operator/windowing/WindowFlattenerTest.java |  53 ---
 .../operator/windowing/WindowFolderTest.java    |  61 ---
 .../windowing/WindowIntegrationTest.java        | 408 -----------------
 .../operator/windowing/WindowMapperTest.java    |  60 ---
 .../operator/windowing/WindowMergerTest.java    |  75 ---
 .../windowing/WindowPartitionerTest.java        |  75 ---
 .../operator/windowing/WindowReducerTest.java   |  61 ---
 .../streaming/api/operators/CounterTest.java    |  40 ++
 .../streaming/api/operators/FilterTest.java     |  52 +++
 .../streaming/api/operators/FlatMapTest.java    |  55 +++
 .../api/operators/GroupedFoldTest.java          |  67 +++
 .../api/operators/GroupedReduceTest.java        |  63 +++
 .../flink/streaming/api/operators/MapTest.java  |  50 ++
 .../streaming/api/operators/ProjectTest.java    |  68 +++
 .../streaming/api/operators/StreamFoldTest.java |  57 +++
 .../api/operators/StreamReduceTest.java         |  55 +++
 .../api/operators/co/CoFlatMapTest.java         |  84 ++++
 .../api/operators/co/CoGroupedReduceTest.java   | 125 +++++
 .../streaming/api/operators/co/CoMapTest.java   |  57 +++
 .../api/operators/co/CoStreamReduceTest.java    |  71 +++
 .../api/operators/co/CoWindowTest.java          | 182 ++++++++
 .../api/operators/co/SelfConnectionTest.java    | 252 ++++++++++
 .../windowing/GroupedStreamDiscretizerTest.java | 104 +++++
 .../operators/windowing/ParallelMergeTest.java  | 121 +++++
 .../windowing/StreamDiscretizerTest.java        | 117 +++++
 .../windowing/WindowFlattenerTest.java          |  54 +++
 .../operators/windowing/WindowFolderTest.java   |  62 +++
 .../windowing/WindowIntegrationTest.java        | 408 +++++++++++++++++
 .../operators/windowing/WindowMapperTest.java   |  61 +++
 .../operators/windowing/WindowMergerTest.java   |  76 +++
 .../windowing/WindowPartitionerTest.java        |  76 +++
 .../operators/windowing/WindowReducerTest.java  |  62 +++
 .../streaming/api/state/OperatorStateTest.java  |  44 ++
 .../streaming/api/streamrecord/UIDTest.java     |  53 ---
 .../api/streamtask/MockRecordWriter.java        |  45 ++
 .../api/streamtask/StreamVertexTest.java        | 186 ++++++++
 .../api/streamvertex/MockRecordWriter.java      |  45 --
 .../api/streamvertex/StreamVertexTest.java      | 186 --------
 .../SlidingCountGroupedPreReducerTest.java      |   2 +-
 .../SlidingTimeGroupedPreReducerTest.java       |   2 +-
 .../flink/streaming/io/BarrierBufferIOTest.java | 159 -------
 .../flink/streaming/io/BarrierBufferTest.java   | 216 ---------
 .../flink/streaming/io/CoRecordReaderTest.java  |  90 ----
 .../streaming/io/SpillingBufferOrEventTest.java |  96 ----
 .../partitioner/BroadcastPartitionerTest.java   |  55 ---
 .../partitioner/DistributePartitionerTest.java  |  56 ---
 .../partitioner/FieldsPartitionerTest.java      |  76 ---
 .../partitioner/ForwardPartitionerTest.java     |  55 ---
 .../partitioner/GlobalPartitionerTest.java      |  50 --
 .../partitioner/ShufflePartitionerTest.java     |  60 ---
 .../runtime/io/BarrierBufferIOTest.java         | 160 +++++++
 .../streaming/runtime/io/BarrierBufferTest.java | 217 +++++++++
 .../runtime/io/CoRecordReaderTest.java          |  92 ++++
 .../runtime/io/SpillingBufferOrEventTest.java   |  99 ++++
 .../partitioner/BroadcastPartitionerTest.java   |  56 +++
 .../partitioner/DistributePartitionerTest.java  |  57 +++
 .../partitioner/FieldsPartitionerTest.java      |  77 ++++
 .../partitioner/ForwardPartitionerTest.java     |  56 +++
 .../partitioner/GlobalPartitionerTest.java      |  51 +++
 .../partitioner/ShufflePartitionerTest.java     |  61 +++
 .../streaming/state/OperatorStateTest.java      |  44 --
 .../flink/streaming/util/MockCoContext.java     |  26 +-
 .../flink/streaming/util/MockContext.java       |  26 +-
 .../streaming/util/MockRecordWriterFactory.java |   2 +-
 .../apache/flink/streaming/util/MockSource.java |   2 +-
 .../streaming/util/TestListResultSink.java      |   2 +-
 .../streaming/util/TestStreamEnvironment.java   |   2 -
 .../examples/iteration/IterateExample.java      |   2 +-
 .../streaming/examples/join/WindowJoin.java     |   4 +-
 .../ml/IncrementalLearningSkeleton.java         |   6 +-
 .../examples/windowing/SessionWindowing.java    |   2 +-
 .../examples/windowing/StockPrices.java         |   4 +-
 .../windowing/TopSpeedWindowingExample.java     |   2 +-
 .../scala/ScalaStreamingAggregator.java         |   6 +-
 .../api/scala/ConnectedDataStream.scala         |  16 +-
 .../flink/streaming/api/scala/DataStream.scala  |  41 +-
 .../api/scala/StreamCrossOperator.scala         |  16 +-
 .../api/scala/StreamExecutionEnvironment.scala  |   4 +-
 .../api/scala/StreamJoinOperator.scala          |  12 +-
 .../streaming/api/scala/TemporalOperator.scala  |   4 +-
 .../api/scala/WindowedDataStream.scala          |   6 +-
 .../java/table/JavaStreamingTranslator.scala    |   7 +-
 .../StreamCheckpointingITCase.java              |   4 +-
 .../test/classloading/jar/StreamingProgram.java |   2 +-
 .../ProcessFailureStreamingRecoveryITCase.java  |   6 +-
 380 files changed, 15525 insertions(+), 15707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index a7b0b06..1e645cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 8112159..50f5770 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index b1e0f0b..e13ff72 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 376d96f..85c4d35 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 7cd8a28..cb89248 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -21,9 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
 import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 53db1c8..fa729d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 740907f..00ec156 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -25,8 +25,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a32fe1b..e500fef 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.twitter;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
 import org.apache.flink.util.Collector;
 import org.apache.sling.commons.json.JSONException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index b416839..8205799 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -37,8 +37,8 @@ import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
deleted file mode 100644
index ad9326a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-public class JSONGenerator {
-
-	public static final String STEPS = "step_function";
-	public static final String ID = "id";
-	public static final String SIDE = "side";
-	public static final String SHIP_STRATEGY = "ship_strategy";
-	public static final String PREDECESSORS = "predecessors";
-	public static final String TYPE = "type";
-	public static final String PACT = "pact";
-	public static final String CONTENTS = "contents";
-	public static final String PARALLELISM = "parallelism";
-
-	private StreamGraph streamGraph;
-
-	public JSONGenerator(StreamGraph streamGraph) {
-		this.streamGraph = streamGraph;
-	}
-
-	public String getJSON() throws JSONException {
-		JSONObject json = new JSONObject();
-		JSONArray nodes = new JSONArray();
-		json.put("nodes", nodes);
-		List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
-		Collections.sort(operatorIDs);
-		visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
-		return json.toString();
-	}
-
-	private void visit(JSONArray jsonArray, List<Integer> toVisit,
-			Map<Integer, Integer> edgeRemapings) throws JSONException {
-
-		Integer vertexID = toVisit.get(0);
-		StreamNode vertex = streamGraph.getVertex(vertexID);
-
-		if (streamGraph.getSourceIDs().contains(vertexID)
-				|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
-
-			JSONObject node = new JSONObject();
-			decorateNode(vertexID, node);
-
-			if (!streamGraph.getSourceIDs().contains(vertexID)) {
-				JSONArray inputs = new JSONArray();
-				node.put(PREDECESSORS, inputs);
-
-				for (StreamEdge inEdge : vertex.getInEdges()) {
-					int inputID = inEdge.getSourceID();
-
-					Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
-							.get(inputID) : inputID;
-					decorateEdge(inputs, vertexID, mappedID, inputID);
-				}
-			}
-			jsonArray.put(node);
-			toVisit.remove(vertexID);
-		} else {
-			Integer iterationHead = -1;
-			for (StreamEdge inEdge : vertex.getInEdges()) {
-				int operator = inEdge.getSourceID();
-
-				if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
-					iterationHead = operator;
-				}
-			}
-
-			JSONObject obj = new JSONObject();
-			JSONArray iterationSteps = new JSONArray();
-			obj.put(STEPS, iterationSteps);
-			obj.put(ID, iterationHead);
-			obj.put(PACT, "IterativeDataStream");
-			obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
-			obj.put(CONTENTS, "Stream Iteration");
-			JSONArray iterationInputs = new JSONArray();
-			obj.put(PREDECESSORS, iterationInputs);
-			toVisit.remove(iterationHead);
-			visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
-			jsonArray.put(obj);
-		}
-
-		if (!toVisit.isEmpty()) {
-			visit(jsonArray, toVisit, edgeRemapings);
-		}
-	}
-
-	private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
-			Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
-
-		Integer vertexID = toVisit.get(0);
-		StreamNode vertex = streamGraph.getVertex(vertexID);
-		toVisit.remove(vertexID);
-
-		// Ignoring head and tail to avoid redundancy
-		if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) {
-			JSONObject obj = new JSONObject();
-			jsonArray.put(obj);
-			decorateNode(vertexID, obj);
-			JSONArray inEdges = new JSONArray();
-			obj.put(PREDECESSORS, inEdges);
-
-			for (StreamEdge inEdge : vertex.getInEdges()) {
-				int inputID = inEdge.getSourceID();
-
-				if (edgeRemapings.keySet().contains(inputID)) {
-					decorateEdge(inEdges, vertexID, inputID, inputID);
-				} else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) {
-					decorateEdge(iterationInEdges, vertexID, inputID, inputID);
-				}
-			}
-
-			edgeRemapings.put(vertexID, headId);
-			visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
-		}
-
-	}
-
-	private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
-			throws JSONException {
-		JSONObject input = new JSONObject();
-		inputArray.put(input);
-		input.put(ID, mappedInputID);
-		input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
-				.getStrategy());
-		input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
-	}
-
-	private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
-
-		StreamNode vertex = streamGraph.getVertex(vertexID);
-
-		node.put(ID, vertexID);
-		node.put(TYPE, vertex.getOperatorName());
-
-		if (streamGraph.getSourceIDs().contains(vertexID)) {
-			node.put(PACT, "Data Source");
-		} else {
-			node.put(PACT, "Data Stream");
-		}
-
-		StreamInvokable<?, ?> invokable = streamGraph.getVertex(vertexID).getInvokable();
-
-		if (invokable != null && invokable.getUserFunction() != null) {
-			node.put(CONTENTS, vertex.getOperatorName() + " at "
-					+ invokable.getUserFunction().getClass().getSimpleName());
-		} else {
-			node.put(CONTENTS, vertex.getOperatorName());
-		}
-
-		node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
deleted file mode 100644
index b04e756..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
-import org.apache.flink.util.InstantiationUtil;
-
-public class StreamConfig implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
-	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
-	private static final String CHAINED_OUTPUTS = "chainedOutputs";
-	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
-	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
-	private static final String OUTPUT_NAME = "outputName_";
-	private static final String VERTEX_NAME = "vertexID";
-	private static final String OPERATOR_NAME = "operatorName";
-	private static final String ITERATION_ID = "iterationId";
-	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
-	private static final String SERIALIZEDUDF = "serializedUDF";
-	private static final String USER_FUNCTION = "userFunction";
-	private static final String BUFFER_TIMEOUT = "bufferTimeout";
-	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
-	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
-	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
-	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
-	private static final String ITERATON_WAIT = "iterationWait";
-	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
-	private static final String EDGES_IN_ORDER = "edgesInOrder";
-	private static final String OUT_STREAM_EDGES = "outStreamEdges";
-	private static final String IN_STREAM_EDGES = "inStreamEdges";
-
-	// DEFAULT VALUES
-	private static final long DEFAULT_TIMEOUT = 100;
-	public static final String STATE_MONITORING = "STATE_MONITORING";
-
-	// CONFIG METHODS
-
-	private Configuration config;
-
-	public StreamConfig(Configuration config) {
-		this.config = config;
-	}
-
-	public Configuration getConfiguration() {
-		return config;
-	}
-
-	public void setVertexID(Integer vertexID) {
-		config.setInteger(VERTEX_NAME, vertexID);
-	}
-
-	public Integer getVertexID() {
-		return config.getInteger(VERTEX_NAME, -1);
-	}
-
-	public void setOperatorName(String name) {
-		config.setString(OPERATOR_NAME, name);
-	}
-
-	public String getOperatorName() {
-		return config.getString(OPERATOR_NAME, "Missing");
-	}
-
-	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
-	}
-
-	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
-	}
-
-	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
-	}
-
-	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
-		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_IN_1, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
-		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_IN_2, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
-		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_OUT_1, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
-		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_OUT_2, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate serializer.");
-		}
-	}
-
-	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
-		config.setBytes(key, SerializationUtils.serialize(typeWrapper));
-	}
-
-	public void setBufferTimeout(long timeout) {
-		config.setLong(BUFFER_TIMEOUT, timeout);
-	}
-
-	public long getBufferTimeout() {
-		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
-	}
-
-	public void setUserInvokable(StreamInvokable<?, ?> invokableObject) {
-		if (invokableObject != null) {
-			config.setClass(USER_FUNCTION, invokableObject.getClass());
-
-			try {
-				config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
-			} catch (SerializationException e) {
-				throw new RuntimeException("Cannot serialize invokable object "
-						+ invokableObject.getClass(), e);
-			}
-		}
-	}
-
-	@SuppressWarnings({ "unchecked" })
-	public <T> T getUserInvokable(ClassLoader cl) {
-		try {
-			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
-		} catch (Exception e) {
-			throw new StreamVertexException("Cannot instantiate user function", e);
-		}
-	}
-
-	public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
-		try {
-			config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
-		} catch (SerializationException e) {
-			throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
-		try {
-			return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					OUTPUT_SELECTOR_WRAPPER, cl);
-		} catch (Exception e) {
-			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
-		}
-	}
-
-	public void setIterationId(Integer iterationId) {
-		config.setInteger(ITERATION_ID, iterationId);
-	}
-
-	public Integer getIterationId() {
-		return config.getInteger(ITERATION_ID, 0);
-	}
-
-	public void setIterationWaitTime(long time) {
-		config.setLong(ITERATON_WAIT, time);
-	}
-
-	public long getIterationWaitTime() {
-		return config.getLong(ITERATON_WAIT, 0);
-	}
-
-	public void setSelectedNames(Integer output, List<String> selected) {
-		if (selected != null) {
-			config.setBytes(OUTPUT_NAME + output,
-					SerializationUtils.serialize((Serializable) selected));
-		} else {
-			config.setBytes(OUTPUT_NAME + output,
-					SerializationUtils.serialize(new ArrayList<String>()));
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<String> getSelectedNames(Integer output) {
-		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
-				null));
-	}
-
-	public void setNumberOfInputs(int numberOfInputs) {
-		config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
-	}
-
-	public int getNumberOfInputs() {
-		return config.getInteger(NUMBER_OF_INPUTS, 0);
-	}
-
-	public void setNumberOfOutputs(int numberOfOutputs) {
-		config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
-	}
-
-	public int getNumberOfOutputs() {
-		return config.getInteger(NUMBER_OF_OUTPUTS, 0);
-	}
-
-	public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
-		config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
-		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
-		}
-	}
-
-	public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
-		config.setBytes(CHAINED_OUTPUTS,
-				SerializationUtils.serialize((Serializable) chainedOutputs));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
-		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
-					CHAINED_OUTPUTS, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate chained outputs.");
-		}
-	}
-
-	public void setOutEdges(List<StreamEdge> outEdges) {
-		config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<StreamEdge> getOutEdges(ClassLoader cl) {
-		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, OUT_STREAM_EDGES, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
-		}
-	}
-
-	public void setInPhysicalEdges(List<StreamEdge> inEdges) {
-		config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
-		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, IN_STREAM_EDGES, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate inputs.");
-		}
-	}
-
-	public void setStateMonitoring(boolean stateMonitoring) {
-
-		config.setBoolean(STATE_MONITORING, stateMonitoring);
-
-	}
-
-	public boolean getStateMonitoring()
-	{
-		return config.getBoolean(STATE_MONITORING, false);
-	}
-
-	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
-		config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
-	}
-
-	@SuppressWarnings("unchecked")
-	public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
-		try {
-			return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, EDGES_IN_ORDER, cl);
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate outputs.");
-		}
-	}
-
-	public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
-		config.setBytes(CHAINED_TASK_CONFIG,
-				SerializationUtils.serialize((Serializable) chainedTaskConfigs));
-	}
-
-	@SuppressWarnings("unchecked")
-	public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
-		try {
-
-			Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
-					.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
-
-			return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
-		} catch (Exception e) {
-			throw new RuntimeException("Could not instantiate configuration.");
-		}
-	}
-
-	public void setChainStart() {
-		config.setBoolean(IS_CHAINED_VERTEX, true);
-	}
-
-	public boolean isChainStart() {
-		return config.getBoolean(IS_CHAINED_VERTEX, false);
-	}
-
-	@Override
-	public String toString() {
-
-		ClassLoader cl = getClass().getClassLoader();
-
-		StringBuilder builder = new StringBuilder();
-		builder.append("\n=======================");
-		builder.append("Stream Config");
-		builder.append("=======================");
-		builder.append("\nTask name: " + getVertexID());
-		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
-		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
-		builder.append("\nOutput names: " + getNonChainedOutputs(cl));
-		builder.append("\nPartitioning:");
-		for (StreamEdge output : getNonChainedOutputs(cl)) {
-			int outputname = output.getTargetID();
-			builder.append("\n\t" + outputname + ": " + output.getPartitioner());
-		}
-
-		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
-
-		try {
-			builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName());
-		} catch (Exception e) {
-			builder.append("\nInvokable: Missing");
-		}
-		builder.append("\nBuffer timeout: " + getBufferTimeout());
-		builder.append("\nState Monitoring: " + getStateMonitoring());
-		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
-			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
-			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
-		}
-
-		return builder.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
deleted file mode 100644
index 066f320..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-
-/**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
-public class StreamEdge implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	final private String edgeId;
-
-	final private StreamNode sourceVertex;
-	final private StreamNode targetVertex;
-
-	/**
-	 * The type number of the input for co-tasks.
-	 */
-	final private int typeNumber;
-
-	/**
-	 * A list of output names that the target vertex listens to (if there is
-	 * output selection).
-	 */
-	final private List<String> selectedNames;
-	final private StreamPartitioner<?> outputPartitioner;
-
-	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
-			List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
-		this.sourceVertex = sourceVertex;
-		this.targetVertex = targetVertex;
-		this.typeNumber = typeNumber;
-		this.selectedNames = selectedNames;
-		this.outputPartitioner = outputPartitioner;
-
-		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
-				+ "_" + outputPartitioner;
-	}
-
-	public StreamNode getSourceVertex() {
-		return sourceVertex;
-	}
-
-	public StreamNode getTargetVertex() {
-		return targetVertex;
-	}
-
-	public int getSourceID() {
-		return sourceVertex.getID();
-	}
-
-	public int getTargetID() {
-		return targetVertex.getID();
-	}
-
-	public int getTypeNumber() {
-		return typeNumber;
-	}
-
-	public List<String> getSelectedNames() {
-		return selectedNames;
-	}
-
-	public StreamPartitioner<?> getPartitioner() {
-		return outputPartitioner;
-	}
-
-	@Override
-	public int hashCode() {
-		return edgeId.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		StreamEdge that = (StreamEdge) o;
-
-		if (!edgeId.equals(that.edgeId)) {
-			return false;
-		}
-
-		return true;
-	}
-
-	@Override
-	public String toString() {
-		return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
-				+ ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
-				+ ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
deleted file mode 100644
index 9e4a7e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.api.streamvertex.StreamVertex;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.sling.commons.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class representing the streaming topology. It contains all the information
- * necessary to build the jobgraph for the execution.
- * 
- */
-public class StreamGraph extends StreamingPlan {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
-	private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
-	private String jobName = DEAFULT_JOB_NAME;
-
-	private final StreamExecutionEnvironment environemnt;
-	private final ExecutionConfig executionConfig;
-
-	private boolean checkpointingEnabled = false;
-	private long checkpointingInterval = 5000;
-	private boolean chaining = true;
-
-	private final Map<Integer, StreamNode> streamNodes;
-	private final Set<Integer> sources;
-
-	private final Map<Integer, StreamLoop> streamLoops;
-	protected final Map<Integer, StreamLoop> vertexIDtoLoop;
-
-	public StreamGraph(StreamExecutionEnvironment environment) {
-
-		this.environemnt = environment;
-		executionConfig = environment.getConfig();
-
-		streamNodes = new HashMap<Integer, StreamNode>();
-		streamLoops = new HashMap<Integer, StreamLoop>();
-		vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
-		sources = new HashSet<Integer>();
-	}
-
-	protected ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-
-	public void setJobName(String jobName) {
-		this.jobName = jobName;
-	}
-
-	public void setChaining(boolean chaining) {
-		this.chaining = chaining;
-	}
-
-	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-		this.checkpointingEnabled = checkpointingEnabled;
-	}
-
-	public void setCheckpointingInterval(long checkpointingInterval) {
-		this.checkpointingInterval = checkpointingInterval;
-	}
-
-	public long getCheckpointingInterval() {
-		return checkpointingInterval;
-	}
-
-	public boolean isChainingEnabled() {
-		return chaining;
-	}
-
-	public boolean isCheckpointingEnabled() {
-		return checkpointingEnabled;
-	}
-
-	public boolean isIterative() {
-		return !streamLoops.isEmpty();
-	}
-
-	public <IN, OUT> void addSource(Integer vertexID, StreamInvokable<IN, OUT> invokableObject,
-			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-		addOperator(vertexID, invokableObject, inTypeInfo, outTypeInfo, operatorName);
-		sources.add(vertexID);
-	}
-
-	public <IN, OUT> void addOperator(Integer vertexID, StreamInvokable<IN, OUT> invokableObject,
-			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-
-		addNode(vertexID, StreamVertex.class, invokableObject, operatorName);
-
-		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
-				inTypeInfo, executionConfig) : null;
-
-		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
-				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
-				outTypeInfo, executionConfig) : null;
-
-		setSerializers(vertexID, inSerializer, null, outSerializer);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Vertex: {}", vertexID);
-		}
-	}
-
-	public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-			CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
-			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-
-		addNode(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName);
-
-		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
-				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
-				outTypeInfo, executionConfig) : null;
-
-		setSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
-				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("CO-TASK: {}", vertexID);
-		}
-	}
-
-	public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
-			long timeOut) {
-
-		addNode(vertexID, StreamIterationHead.class, null, null);
-
-		chaining = false;
-
-		StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
-		streamLoops.put(iterationID, iteration);
-		vertexIDtoLoop.put(vertexID, iteration);
-
-		setSerializersFrom(iterationHead, vertexID);
-		getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
-
-		int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
-		StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
-				.get(0).getPartitioner();
-
-		addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SOURCE: {}", vertexID);
-		}
-
-		sources.add(vertexID);
-	}
-
-	public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
-			long waitTime) {
-
-		if (getVertex(iterationTail).getBufferTimeout() == 0) {
-			throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
-		}
-
-		addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
-				getVertex(iterationTail).getParallelism());
-
-		StreamLoop iteration = streamLoops.get(iterationID);
-		iteration.setTail(getVertex(iterationTail));
-		vertexIDtoLoop.put(vertexID, iteration);
-
-		setSerializersFrom(iterationTail, vertexID);
-		getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
-
-		setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
-		setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("ITERATION SINK: {}", vertexID);
-		}
-
-	}
-
-	protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
-			StreamInvokable<?, ?> invokableObject, String operatorName) {
-
-		StreamNode vertex = new StreamNode(environemnt, vertexID, invokableObject, operatorName,
-				new ArrayList<OutputSelector<?>>(), vertexClass);
-
-		streamNodes.put(vertexID, vertex);
-
-		return vertex;
-	}
-
-	public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
-			StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
-
-		StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
-				getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
-		getVertex(edge.getSourceID()).addOutEdge(edge);
-		getVertex(edge.getTargetID()).addInEdge(edge);
-	}
-
-	public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
-		getVertex(vertexID).addOutputSelector(outputSelector);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Outputselector set for {}", vertexID);
-		}
-
-	}
-
-	public void setParallelism(Integer vertexID, int parallelism) {
-		getVertex(vertexID).setParallelism(parallelism);
-	}
-
-	public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
-		getVertex(vertexID).setBufferTimeout(bufferTimeout);
-	}
-
-	private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
-			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
-		StreamNode vertex = getVertex(vertexID);
-		vertex.setSerializerIn1(in1);
-		vertex.setSerializerIn2(in2);
-		vertex.setSerializerOut(out);
-	}
-
-	private void setSerializersFrom(Integer from, Integer to) {
-		StreamNode fromVertex = getVertex(from);
-		StreamNode toVertex = getVertex(to);
-
-		toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
-		toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
-	}
-
-	public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
-		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
-				executionConfig);
-		getVertex(vertexID).setSerializerOut(serializer);
-	}
-
-	public <IN, OUT> void setInvokable(Integer vertexID, StreamInvokable<IN, OUT> invokableObject) {
-		getVertex(vertexID).setInvokable(invokableObject);
-	}
-
-	public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
-		getVertex(vertexID).setInputFormat(inputFormat);
-	}
-
-	public StreamNode getVertex(Integer vertexID) {
-		return streamNodes.get(vertexID);
-	}
-
-	protected Collection<? extends Integer> getVertexIDs() {
-		return streamNodes.keySet();
-	}
-
-	protected StreamEdge getEdge(int sourceId, int targetId) {
-		Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator();
-		while (outIterator.hasNext()) {
-			StreamEdge edge = outIterator.next();
-
-			if (edge.getTargetID() == targetId) {
-				return edge;
-			}
-		}
-
-		throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
-	}
-
-	public Collection<Integer> getSourceIDs() {
-		return sources;
-	}
-
-	public Set<Tuple2<Integer, StreamInvokable<?, ?>>> getInvokables() {
-		Set<Tuple2<Integer, StreamInvokable<?, ?>>> invokableSet = new HashSet<Tuple2<Integer, StreamInvokable<?, ?>>>();
-		for (StreamNode vertex : streamNodes.values()) {
-			invokableSet.add(new Tuple2<Integer, StreamInvokable<?, ?>>(vertex.getID(), vertex
-					.getInvokable()));
-		}
-		return invokableSet;
-	}
-
-	public Collection<StreamLoop> getStreamLoops() {
-		return streamLoops.values();
-	}
-
-	public Integer getLoopID(Integer vertexID) {
-		return vertexIDtoLoop.get(vertexID).getID();
-	}
-
-	public long getLoopTimeout(Integer vertexID) {
-		return vertexIDtoLoop.get(vertexID).getTimeout();
-	}
-
-	protected void removeEdge(StreamEdge edge) {
-
-		edge.getSourceVertex().getOutEdges().remove(edge);
-		edge.getTargetVertex().getInEdges().remove(edge);
-
-	}
-
-	protected void removeVertex(StreamNode toRemove) {
-
-		Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
-
-		edgesToRemove.addAll(toRemove.getInEdges());
-		edgesToRemove.addAll(toRemove.getOutEdges());
-
-		for (StreamEdge edge : edgesToRemove) {
-			removeEdge(edge);
-		}
-		streamNodes.remove(toRemove.getID());
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} and adds a default name for it.
-	 */
-	public JobGraph getJobGraph() {
-		return getJobGraph(jobName);
-	}
-
-	/**
-	 * Gets the assembled {@link JobGraph} and adds a user specified name for
-	 * it.
-	 * 
-	 * @param jobGraphName
-	 *            name of the jobGraph
-	 */
-	public JobGraph getJobGraph(String jobGraphName) {
-
-		// temporarily forbid checkpointing for iterative jobs
-		if (isIterative() && isCheckpointingEnabled()) {
-			throw new UnsupportedOperationException(
-					"Checkpointing is currently not supported for iterative jobs!");
-		}
-
-		setJobName(jobGraphName);
-
-		WindowingOptimizer.optimizeGraph(this);
-
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
-
-		return jobgraphGenerator.createJobGraph(jobGraphName);
-	}
-
-	@Override
-	public String getStreamingPlanAsJSON() {
-
-		WindowingOptimizer.optimizeGraph(this);
-
-		try {
-			return new JSONGenerator(this).getJSON();
-		} catch (JSONException e) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("JSON plan creation failed: {}", e);
-			}
-			return "";
-		}
-
-	}
-
-	@Override
-	public void dumpStreamingPlanAsJSON(File file) throws IOException {
-		PrintWriter pw = null;
-		try {
-			pw = new PrintWriter(new FileOutputStream(file), false);
-			pw.write(getStreamingPlanAsJSON());
-			pw.flush();
-
-		} finally {
-			if (pw != null) {
-				pw.close();
-			}
-		}
-	}
-
-	/**
-	 * Object for representing loops in streaming programs.
-	 * 
-	 */
-	protected static class StreamLoop {
-
-		private Integer loopID;
-
-		private StreamNode head;
-		private StreamNode tail;
-
-		private Long timeout;
-
-		public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
-			this.loopID = loopID;
-			this.head = head;
-			this.timeout = timeout;
-		}
-
-		public Integer getID() {
-			return loopID;
-		}
-
-		public Long getTimeout() {
-			return timeout;
-		}
-
-		public void setTail(StreamNode tail) {
-			this.tail = tail;
-		}
-
-		public StreamNode getHead() {
-			return head;
-		}
-
-		public StreamNode getTail() {
-			return tail;
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
deleted file mode 100644
index 86b4782..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-
-/**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- * 
- */
-public class StreamNode implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	transient private StreamExecutionEnvironment env;
-
-	private Integer ID;
-	private Integer parallelism = null;
-	private Long bufferTimeout = null;
-	private String operatorName;
-
-	private StreamInvokable<?, ?> invokable;
-	private List<OutputSelector<?>> outputSelectors;
-	private StreamRecordSerializer<?> typeSerializerIn1;
-	private StreamRecordSerializer<?> typeSerializerIn2;
-	private StreamRecordSerializer<?> typeSerializerOut;
-
-	private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
-	private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
-
-	private Class<? extends AbstractInvokable> jobVertexClass;
-
-	private InputFormat<String, ?> inputFormat;
-
-	public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamInvokable<?, ?> invokable,
-			String operatorName, List<OutputSelector<?>> outputSelector,
-			Class<? extends AbstractInvokable> jobVertexClass) {
-		this.env = env;
-		this.ID = ID;
-		this.operatorName = operatorName;
-		this.invokable = invokable;
-		this.outputSelectors = outputSelector;
-		this.jobVertexClass = jobVertexClass;
-	}
-
-	public void addInEdge(StreamEdge inEdge) {
-		if (inEdge.getTargetID() != getID()) {
-			throw new IllegalArgumentException("Destination ID doesn't match the StreamNode ID");
-		} else {
-			inEdges.add(inEdge);
-		}
-	}
-
-	public void addOutEdge(StreamEdge outEdge) {
-		if (outEdge.getSourceID() != getID()) {
-			throw new IllegalArgumentException("Source ID doesn't match the StreamNode ID");
-		} else {
-			outEdges.add(outEdge);
-		}
-	}
-
-	public List<StreamEdge> getOutEdges() {
-		return outEdges;
-	}
-
-	public List<StreamEdge> getInEdges() {
-		return inEdges;
-	}
-
-	public List<Integer> getOutEdgeIndices() {
-		List<Integer> outEdgeIndices = new ArrayList<Integer>();
-
-		for (StreamEdge edge : outEdges) {
-			outEdgeIndices.add(edge.getTargetID());
-		}
-
-		return outEdgeIndices;
-	}
-
-	public List<Integer> getInEdgeIndices() {
-		List<Integer> inEdgeIndices = new ArrayList<Integer>();
-
-		for (StreamEdge edge : inEdges) {
-			inEdgeIndices.add(edge.getSourceID());
-		}
-
-		return inEdgeIndices;
-	}
-
-	public Integer getID() {
-		return ID;
-	}
-
-	public Integer getParallelism() {
-		return parallelism != null ? parallelism : env.getParallelism();
-	}
-
-	public void setParallelism(Integer parallelism) {
-		this.parallelism = parallelism;
-	}
-
-	public Long getBufferTimeout() {
-		return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
-	}
-
-	public void setBufferTimeout(Long bufferTimeout) {
-		this.bufferTimeout = bufferTimeout;
-	}
-
-	public StreamInvokable<?, ?> getInvokable() {
-		return invokable;
-	}
-
-	public void setInvokable(StreamInvokable<?, ?> invokable) {
-		this.invokable = invokable;
-	}
-
-	public String getOperatorName() {
-		return operatorName;
-	}
-
-	public void setOperatorName(String operatorName) {
-		this.operatorName = operatorName;
-	}
-
-	public List<OutputSelector<?>> getOutputSelectors() {
-		return outputSelectors;
-	}
-
-	public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
-		return OutputSelectorWrapperFactory.create(getOutputSelectors());
-	}
-
-	public void addOutputSelector(OutputSelector<?> outputSelector) {
-		this.outputSelectors.add(outputSelector);
-	}
-
-	public StreamRecordSerializer<?> getTypeSerializerIn1() {
-		return typeSerializerIn1;
-	}
-
-	public void setSerializerIn1(StreamRecordSerializer<?> typeSerializerIn1) {
-		this.typeSerializerIn1 = typeSerializerIn1;
-	}
-
-	public StreamRecordSerializer<?> getTypeSerializerIn2() {
-		return typeSerializerIn2;
-	}
-
-	public void setSerializerIn2(StreamRecordSerializer<?> typeSerializerIn2) {
-		this.typeSerializerIn2 = typeSerializerIn2;
-	}
-
-	public StreamRecordSerializer<?> getTypeSerializerOut() {
-		return typeSerializerOut;
-	}
-
-	public void setSerializerOut(StreamRecordSerializer<?> typeSerializerOut) {
-		this.typeSerializerOut = typeSerializerOut;
-	}
-
-	public Class<? extends AbstractInvokable> getJobVertexClass() {
-		return jobVertexClass;
-	}
-
-	public InputFormat<String, ?> getInputFormat() {
-		return inputFormat;
-	}
-
-	public void setInputFormat(InputFormat<String, ?> inputFormat) {
-		this.inputFormat = inputFormat;
-	}
-
-}