You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:39:00 UTC
[19/19] flink git commit: [streaming] Major internal renaming and
restructure
[streaming] Major internal renaming and restructure
Closes #594
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4754a97b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4754a97b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4754a97b
Branch: refs/heads/master
Commit: 4754a97b19c96647886a46131750db26a0b5f618
Parents: f1b445b
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 13 20:46:33 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Apr 15 11:35:12 2015 +0200
----------------------------------------------------------------------
.../streaming/connectors/ConnectorSource.java | 4 +-
.../streaming/connectors/flume/FlumeSink.java | 2 +-
.../connectors/kafka/KafkaProducerExample.java | 2 +-
.../connectors/kafka/api/KafkaSink.java | 2 +-
.../kafka/api/simple/PersistentKafkaSource.java | 3 +-
.../streaming/connectors/rabbitmq/RMQSink.java | 2 +-
.../connectors/twitter/TwitterSource.java | 4 +-
.../connectors/twitter/TwitterStreaming.java | 2 +-
.../streaming/connectors/kafka/KafkaITCase.java | 4 +-
.../flink/streaming/api/JSONGenerator.java | 180 --------
.../flink/streaming/api/StreamConfig.java | 392 ----------------
.../apache/flink/streaming/api/StreamEdge.java | 120 -----
.../apache/flink/streaming/api/StreamGraph.java | 458 -------------------
.../apache/flink/streaming/api/StreamNode.java | 201 --------
.../api/StreamingJobGraphGenerator.java | 346 --------------
.../flink/streaming/api/WindowingOptimizer.java | 151 ------
.../api/collector/CollectorWrapper.java | 2 +-
.../streaming/api/collector/StreamOutput.java | 7 +-
.../BroadcastOutputSelectorWrapper.java | 2 +-
.../selector/DirectedOutputSelectorWrapper.java | 2 +-
.../selector/OutputSelectorWrapper.java | 2 +-
.../api/datastream/ConnectedDataStream.java | 52 +--
.../streaming/api/datastream/DataStream.java | 82 ++--
.../api/datastream/DataStreamSink.java | 6 +-
.../api/datastream/DataStreamSource.java | 8 +-
.../api/datastream/DiscretizedStream.java | 44 +-
.../api/datastream/GroupedDataStream.java | 16 +-
.../api/datastream/IterativeDataStream.java | 6 +-
.../datastream/SingleOutputStreamOperator.java | 14 +-
.../api/datastream/StreamProjection.java | 52 +--
.../api/datastream/WindowedDataStream.java | 38 +-
.../temporal/StreamCrossOperator.java | 112 +++++
.../datastream/temporal/StreamJoinOperator.java | 273 +++++++++++
.../datastream/temporal/TemporalOperator.java | 124 +++++
.../api/datastream/temporal/TemporalWindow.java | 45 ++
.../temporaloperator/StreamCrossOperator.java | 112 -----
.../temporaloperator/StreamJoinOperator.java | 273 -----------
.../temporaloperator/TemporalOperator.java | 124 -----
.../temporaloperator/TemporalWindow.java | 45 --
.../environment/StreamExecutionEnvironment.java | 32 +-
.../api/function/RichWindowMapFunction.java | 40 --
.../api/function/WindowMapFunction.java | 35 --
.../aggregation/AggregationFunction.java | 35 --
.../aggregation/ComparableAggregator.java | 238 ----------
.../api/function/aggregation/Comparator.java | 104 -----
.../api/function/aggregation/SumAggregator.java | 173 -------
.../api/function/aggregation/SumFunction.java | 102 -----
.../api/function/co/CoFlatMapFunction.java | 42 --
.../api/function/co/CoMapFunction.java | 41 --
.../api/function/co/CoReduceFunction.java | 107 -----
.../api/function/co/CoWindowFunction.java | 30 --
.../api/function/co/CrossWindowFunction.java | 44 --
.../api/function/co/JoinWindowFunction.java | 77 ----
.../api/function/co/RichCoFlatMapFunction.java | 40 --
.../api/function/co/RichCoMapFunction.java | 40 --
.../api/function/co/RichCoReduceFunction.java | 40 --
.../api/function/co/RichCoWindowFunction.java | 34 --
.../api/function/sink/FileSinkFunction.java | 118 -----
.../function/sink/FileSinkFunctionByMillis.java | 59 ---
.../api/function/sink/PrintSinkFunction.java | 96 ----
.../api/function/sink/RichSinkFunction.java | 28 --
.../api/function/sink/SinkFunction.java | 38 --
.../api/function/sink/SocketClientSink.java | 130 ------
.../api/function/sink/WriteFormat.java | 43 --
.../api/function/sink/WriteFormatAsCsv.java | 49 --
.../api/function/sink/WriteFormatAsText.java | 47 --
.../api/function/sink/WriteSinkFunction.java | 92 ----
.../sink/WriteSinkFunctionByMillis.java | 50 --
.../function/source/FileMonitoringFunction.java | 133 ------
.../api/function/source/FileReadFunction.java | 51 ---
.../api/function/source/FileSourceFunction.java | 135 ------
.../function/source/FromElementsFunction.java | 61 ---
.../function/source/GenSequenceFunction.java | 61 ---
.../function/source/GenericSourceFunction.java | 25 -
.../function/source/ParallelSourceFunction.java | 34 --
.../source/RichParallelSourceFunction.java | 33 --
.../api/function/source/RichSourceFunction.java | 46 --
.../source/SocketTextStreamFunction.java | 148 ------
.../api/function/source/SourceFunction.java | 56 ---
.../api/functions/RichWindowMapFunction.java | 40 ++
.../api/functions/WindowMapFunction.java | 35 ++
.../aggregation/AggregationFunction.java | 35 ++
.../aggregation/ComparableAggregator.java | 238 ++++++++++
.../api/functions/aggregation/Comparator.java | 104 +++++
.../functions/aggregation/SumAggregator.java | 173 +++++++
.../api/functions/aggregation/SumFunction.java | 102 +++++
.../api/functions/co/CoFlatMapFunction.java | 42 ++
.../api/functions/co/CoMapFunction.java | 41 ++
.../api/functions/co/CoReduceFunction.java | 107 +++++
.../api/functions/co/CoWindowFunction.java | 30 ++
.../api/functions/co/CrossWindowFunction.java | 44 ++
.../api/functions/co/JoinWindowFunction.java | 77 ++++
.../api/functions/co/RichCoFlatMapFunction.java | 40 ++
.../api/functions/co/RichCoMapFunction.java | 40 ++
.../api/functions/co/RichCoReduceFunction.java | 40 ++
.../api/functions/co/RichCoWindowFunction.java | 34 ++
.../api/functions/sink/FileSinkFunction.java | 118 +++++
.../sink/FileSinkFunctionByMillis.java | 59 +++
.../api/functions/sink/PrintSinkFunction.java | 96 ++++
.../api/functions/sink/RichSinkFunction.java | 28 ++
.../api/functions/sink/SinkFunction.java | 38 ++
.../api/functions/sink/SocketClientSink.java | 130 ++++++
.../api/functions/sink/WriteFormat.java | 43 ++
.../api/functions/sink/WriteFormatAsCsv.java | 49 ++
.../api/functions/sink/WriteFormatAsText.java | 47 ++
.../api/functions/sink/WriteSinkFunction.java | 92 ++++
.../sink/WriteSinkFunctionByMillis.java | 50 ++
.../source/FileMonitoringFunction.java | 133 ++++++
.../api/functions/source/FileReadFunction.java | 51 +++
.../functions/source/FileSourceFunction.java | 135 ++++++
.../functions/source/FromElementsFunction.java | 61 +++
.../functions/source/GenSequenceFunction.java | 61 +++
.../functions/source/GenericSourceFunction.java | 25 +
.../source/ParallelSourceFunction.java | 34 ++
.../source/RichParallelSourceFunction.java | 33 ++
.../functions/source/RichSourceFunction.java | 46 ++
.../source/SocketTextStreamFunction.java | 148 ++++++
.../api/functions/source/SourceFunction.java | 56 +++
.../streaming/api/graph/JSONGenerator.java | 180 ++++++++
.../flink/streaming/api/graph/StreamConfig.java | 392 ++++++++++++++++
.../flink/streaming/api/graph/StreamEdge.java | 120 +++++
.../flink/streaming/api/graph/StreamGraph.java | 458 +++++++++++++++++++
.../flink/streaming/api/graph/StreamNode.java | 201 ++++++++
.../api/graph/StreamingJobGraphGenerator.java | 346 ++++++++++++++
.../streaming/api/graph/WindowingOptimizer.java | 151 ++++++
.../api/invokable/ChainableInvokable.java | 57 ---
.../streaming/api/invokable/SinkInvokable.java | 43 --
.../api/invokable/SourceInvokable.java | 50 --
.../api/invokable/StreamInvokable.java | 209 ---------
.../invokable/operator/CounterInvokable.java | 45 --
.../api/invokable/operator/FilterInvokable.java | 49 --
.../invokable/operator/FlatMapInvokable.java | 45 --
.../operator/GroupedFoldInvokable.java | 57 ---
.../operator/GroupedReduceInvokable.java | 52 ---
.../api/invokable/operator/MapInvokable.java | 45 --
.../invokable/operator/ProjectInvokable.java | 64 ---
.../invokable/operator/StreamFoldInvokable.java | 54 ---
.../operator/StreamReduceInvokable.java | 55 ---
.../operator/co/CoFlatMapInvokable.java | 54 ---
.../operator/co/CoGroupedReduceInvokable.java | 88 ----
.../api/invokable/operator/co/CoInvokable.java | 155 -------
.../invokable/operator/co/CoMapInvokable.java | 54 ---
.../operator/co/CoReduceInvokable.java | 70 ---
.../operator/co/CoWindowInvokable.java | 197 --------
.../operator/windowing/EmptyWindowFilter.java | 32 --
.../windowing/GroupedActiveDiscretizer.java | 116 -----
.../windowing/GroupedStreamDiscretizer.java | 128 ------
.../windowing/GroupedWindowBufferInvokable.java | 73 ---
.../windowing/ParallelGroupedMerge.java | 41 --
.../operator/windowing/ParallelMerge.java | 142 ------
.../operator/windowing/StreamDiscretizer.java | 223 ---------
.../windowing/WindowBufferInvokable.java | 67 ---
.../operator/windowing/WindowFlattener.java | 50 --
.../operator/windowing/WindowFolder.java | 70 ---
.../operator/windowing/WindowMapper.java | 64 ---
.../operator/windowing/WindowMerger.java | 70 ---
.../operator/windowing/WindowPartExtractor.java | 55 ---
.../operator/windowing/WindowPartitioner.java | 74 ---
.../operator/windowing/WindowReducer.java | 69 ---
.../api/operators/ChainableStreamOperator.java | 57 +++
.../streaming/api/operators/StreamCounter.java | 44 ++
.../streaming/api/operators/StreamFilter.java | 48 ++
.../streaming/api/operators/StreamFlatMap.java | 44 ++
.../streaming/api/operators/StreamFold.java | 53 +++
.../api/operators/StreamGroupedFold.java | 57 +++
.../api/operators/StreamGroupedReduce.java | 52 +++
.../streaming/api/operators/StreamMap.java | 44 ++
.../streaming/api/operators/StreamOperator.java | 209 +++++++++
.../streaming/api/operators/StreamProject.java | 63 +++
.../streaming/api/operators/StreamReduce.java | 54 +++
.../streaming/api/operators/StreamSink.java | 43 ++
.../streaming/api/operators/StreamSource.java | 50 ++
.../api/operators/co/CoStreamFlatMap.java | 54 +++
.../api/operators/co/CoStreamGroupedReduce.java | 88 ++++
.../streaming/api/operators/co/CoStreamMap.java | 54 +++
.../api/operators/co/CoStreamOperator.java | 155 +++++++
.../api/operators/co/CoStreamReduce.java | 70 +++
.../api/operators/co/CoStreamWindow.java | 197 ++++++++
.../operators/windowing/EmptyWindowFilter.java | 32 ++
.../windowing/GroupedActiveDiscretizer.java | 116 +++++
.../windowing/GroupedStreamDiscretizer.java | 128 ++++++
.../windowing/GroupedWindowBuffer.java | 73 +++
.../windowing/ParallelGroupedMerge.java | 41 ++
.../api/operators/windowing/ParallelMerge.java | 142 ++++++
.../operators/windowing/StreamDiscretizer.java | 223 +++++++++
.../operators/windowing/StreamWindowBuffer.java | 67 +++
.../operators/windowing/WindowFlattener.java | 50 ++
.../api/operators/windowing/WindowFolder.java | 70 +++
.../api/operators/windowing/WindowMapper.java | 64 +++
.../api/operators/windowing/WindowMerger.java | 70 +++
.../windowing/WindowPartExtractor.java | 55 +++
.../operators/windowing/WindowPartitioner.java | 74 +++
.../api/operators/windowing/WindowReducer.java | 69 +++
.../streaming/api/state/CircularFifoList.java | 112 +++++
.../api/state/NullableCircularBuffer.java | 362 +++++++++++++++
.../streaming/api/state/PartitionableState.java | 66 +++
.../api/streamrecord/StreamRecord.java | 134 ------
.../streamrecord/StreamRecordSerializer.java | 115 -----
.../flink/streaming/api/streamrecord/UID.java | 122 -----
.../api/streamvertex/CoStreamVertex.java | 143 ------
.../api/streamvertex/InputHandler.java | 94 ----
.../api/streamvertex/OutputHandler.java | 213 ---------
.../api/streamvertex/StreamIterationHead.java | 110 -----
.../api/streamvertex/StreamIterationTail.java | 114 -----
.../api/streamvertex/StreamTaskContext.java | 46 --
.../api/streamvertex/StreamVertex.java | 326 -------------
.../api/streamvertex/StreamVertexException.java | 68 ---
.../streamvertex/StreamingRuntimeContext.java | 120 -----
.../api/streamvertex/StreamingSuperstep.java | 60 ---
.../windowing/policy/ActiveTriggerPolicy.java | 10 +-
.../policy/CloneableEvictionPolicy.java | 2 +-
.../policy/CloneableTriggerPolicy.java | 2 +-
.../api/windowing/policy/EvictionPolicy.java | 2 +-
.../flink/streaming/io/BarrierBuffer.java | 279 -----------
.../flink/streaming/io/BlockingQueueBroker.java | 41 --
.../flink/streaming/io/BufferSpiller.java | 91 ----
.../flink/streaming/io/CoReaderIterator.java | 57 ---
.../flink/streaming/io/CoRecordReader.java | 289 ------------
.../streaming/io/IndexedMutableReader.java | 37 --
.../streaming/io/IndexedReaderIterator.java | 33 --
.../flink/streaming/io/InputGateFactory.java | 42 --
.../flink/streaming/io/RecordWriterFactory.java | 52 ---
.../apache/flink/streaming/io/SpillReader.java | 78 ----
.../streaming/io/SpillingBufferOrEvent.java | 66 ---
.../flink/streaming/io/StreamRecordWriter.java | 88 ----
.../io/StreamingAbstractRecordReader.java | 133 ------
.../io/StreamingMutableRecordReader.java | 43 --
.../flink/streaming/io/StreamingReader.java | 27 --
.../partitioner/BroadcastPartitioner.java | 55 ---
.../partitioner/DistributePartitioner.java | 46 --
.../partitioner/FieldsPartitioner.java | 50 --
.../partitioner/GlobalPartitioner.java | 38 --
.../partitioner/ShufflePartitioner.java | 49 --
.../partitioner/StreamPartitioner.java | 44 --
.../streaming/runtime/io/BarrierBuffer.java | 279 +++++++++++
.../runtime/io/BlockingQueueBroker.java | 41 ++
.../streaming/runtime/io/BufferSpiller.java | 91 ++++
.../streaming/runtime/io/CoReaderIterator.java | 57 +++
.../streaming/runtime/io/CoRecordReader.java | 289 ++++++++++++
.../runtime/io/IndexedMutableReader.java | 37 ++
.../runtime/io/IndexedReaderIterator.java | 33 ++
.../streaming/runtime/io/InputGateFactory.java | 42 ++
.../runtime/io/RecordWriterFactory.java | 52 +++
.../flink/streaming/runtime/io/SpillReader.java | 78 ++++
.../runtime/io/SpillingBufferOrEvent.java | 66 +++
.../runtime/io/StreamRecordWriter.java | 88 ++++
.../io/StreamingAbstractRecordReader.java | 133 ++++++
.../io/StreamingMutableRecordReader.java | 43 ++
.../streaming/runtime/io/StreamingReader.java | 27 ++
.../partitioner/BroadcastPartitioner.java | 55 +++
.../partitioner/DistributePartitioner.java | 46 ++
.../runtime/partitioner/FieldsPartitioner.java | 50 ++
.../runtime/partitioner/GlobalPartitioner.java | 38 ++
.../runtime/partitioner/ShufflePartitioner.java | 49 ++
.../runtime/partitioner/StreamPartitioner.java | 44 ++
.../runtime/streamrecord/StreamRecord.java | 103 +++++
.../streamrecord/StreamRecordSerializer.java | 110 +++++
.../streaming/runtime/tasks/CoStreamTask.java | 143 ++++++
.../streaming/runtime/tasks/InputHandler.java | 94 ++++
.../streaming/runtime/tasks/OutputHandler.java | 213 +++++++++
.../runtime/tasks/StreamIterationHead.java | 110 +++++
.../runtime/tasks/StreamIterationTail.java | 114 +++++
.../streaming/runtime/tasks/StreamTask.java | 326 +++++++++++++
.../runtime/tasks/StreamTaskContext.java | 46 ++
.../runtime/tasks/StreamTaskException.java | 68 +++
.../runtime/tasks/StreamingRuntimeContext.java | 120 +++++
.../runtime/tasks/StreamingSuperstep.java | 60 +++
.../flink/streaming/state/CircularFifoList.java | 112 -----
.../streaming/state/NullableCircularBuffer.java | 362 ---------------
.../streaming/state/PartitionableState.java | 66 ---
.../streaming/api/AggregationFunctionTest.java | 62 +--
.../flink/streaming/api/CoStreamTest.java | 6 +-
.../apache/flink/streaming/api/IterateTest.java | 2 +-
.../apache/flink/streaming/api/SourceTest.java | 4 +-
.../flink/streaming/api/TypeFillTest.java | 8 +-
.../api/collector/DirectedOutputTest.java | 2 +-
.../api/collector/StreamCollectorTest.java | 4 +-
.../operator/CounterInvokableTest.java | 39 --
.../api/invokable/operator/FilterTest.java | 51 ---
.../api/invokable/operator/FlatMapTest.java | 54 ---
.../operator/GroupedFoldInvokableTest.java | 67 ---
.../operator/GroupedReduceInvokableTest.java | 62 ---
.../api/invokable/operator/MapTest.java | 49 --
.../api/invokable/operator/ProjectTest.java | 67 ---
.../api/invokable/operator/StreamFoldTest.java | 56 ---
.../invokable/operator/StreamReduceTest.java | 54 ---
.../invokable/operator/co/CoFlatMapTest.java | 84 ----
.../operator/co/CoGroupedReduceTest.java | 125 -----
.../api/invokable/operator/co/CoMapTest.java | 57 ---
.../operator/co/CoStreamReduceTest.java | 71 ---
.../api/invokable/operator/co/CoWindowTest.java | 182 --------
.../operator/co/SelfConnectionTest.java | 252 ----------
.../windowing/GroupedStreamDiscretizerTest.java | 101 ----
.../operator/windowing/ParallelMergeTest.java | 119 -----
.../windowing/StreamDiscretizerTest.java | 117 -----
.../operator/windowing/WindowFlattenerTest.java | 53 ---
.../operator/windowing/WindowFolderTest.java | 61 ---
.../windowing/WindowIntegrationTest.java | 408 -----------------
.../operator/windowing/WindowMapperTest.java | 60 ---
.../operator/windowing/WindowMergerTest.java | 75 ---
.../windowing/WindowPartitionerTest.java | 75 ---
.../operator/windowing/WindowReducerTest.java | 61 ---
.../streaming/api/operators/CounterTest.java | 40 ++
.../streaming/api/operators/FilterTest.java | 52 +++
.../streaming/api/operators/FlatMapTest.java | 55 +++
.../api/operators/GroupedFoldTest.java | 67 +++
.../api/operators/GroupedReduceTest.java | 63 +++
.../flink/streaming/api/operators/MapTest.java | 50 ++
.../streaming/api/operators/ProjectTest.java | 68 +++
.../streaming/api/operators/StreamFoldTest.java | 57 +++
.../api/operators/StreamReduceTest.java | 55 +++
.../api/operators/co/CoFlatMapTest.java | 84 ++++
.../api/operators/co/CoGroupedReduceTest.java | 125 +++++
.../streaming/api/operators/co/CoMapTest.java | 57 +++
.../api/operators/co/CoStreamReduceTest.java | 71 +++
.../api/operators/co/CoWindowTest.java | 182 ++++++++
.../api/operators/co/SelfConnectionTest.java | 252 ++++++++++
.../windowing/GroupedStreamDiscretizerTest.java | 104 +++++
.../operators/windowing/ParallelMergeTest.java | 121 +++++
.../windowing/StreamDiscretizerTest.java | 117 +++++
.../windowing/WindowFlattenerTest.java | 54 +++
.../operators/windowing/WindowFolderTest.java | 62 +++
.../windowing/WindowIntegrationTest.java | 408 +++++++++++++++++
.../operators/windowing/WindowMapperTest.java | 61 +++
.../operators/windowing/WindowMergerTest.java | 76 +++
.../windowing/WindowPartitionerTest.java | 76 +++
.../operators/windowing/WindowReducerTest.java | 62 +++
.../streaming/api/state/OperatorStateTest.java | 44 ++
.../streaming/api/streamrecord/UIDTest.java | 53 ---
.../api/streamtask/MockRecordWriter.java | 45 ++
.../api/streamtask/StreamVertexTest.java | 186 ++++++++
.../api/streamvertex/MockRecordWriter.java | 45 --
.../api/streamvertex/StreamVertexTest.java | 186 --------
.../SlidingCountGroupedPreReducerTest.java | 2 +-
.../SlidingTimeGroupedPreReducerTest.java | 2 +-
.../flink/streaming/io/BarrierBufferIOTest.java | 159 -------
.../flink/streaming/io/BarrierBufferTest.java | 216 ---------
.../flink/streaming/io/CoRecordReaderTest.java | 90 ----
.../streaming/io/SpillingBufferOrEventTest.java | 96 ----
.../partitioner/BroadcastPartitionerTest.java | 55 ---
.../partitioner/DistributePartitionerTest.java | 56 ---
.../partitioner/FieldsPartitionerTest.java | 76 ---
.../partitioner/ForwardPartitionerTest.java | 55 ---
.../partitioner/GlobalPartitionerTest.java | 50 --
.../partitioner/ShufflePartitionerTest.java | 60 ---
.../runtime/io/BarrierBufferIOTest.java | 160 +++++++
.../streaming/runtime/io/BarrierBufferTest.java | 217 +++++++++
.../runtime/io/CoRecordReaderTest.java | 92 ++++
.../runtime/io/SpillingBufferOrEventTest.java | 99 ++++
.../partitioner/BroadcastPartitionerTest.java | 56 +++
.../partitioner/DistributePartitionerTest.java | 57 +++
.../partitioner/FieldsPartitionerTest.java | 77 ++++
.../partitioner/ForwardPartitionerTest.java | 56 +++
.../partitioner/GlobalPartitionerTest.java | 51 +++
.../partitioner/ShufflePartitionerTest.java | 61 +++
.../streaming/state/OperatorStateTest.java | 44 --
.../flink/streaming/util/MockCoContext.java | 26 +-
.../flink/streaming/util/MockContext.java | 26 +-
.../streaming/util/MockRecordWriterFactory.java | 2 +-
.../apache/flink/streaming/util/MockSource.java | 2 +-
.../streaming/util/TestListResultSink.java | 2 +-
.../streaming/util/TestStreamEnvironment.java | 2 -
.../examples/iteration/IterateExample.java | 2 +-
.../streaming/examples/join/WindowJoin.java | 4 +-
.../ml/IncrementalLearningSkeleton.java | 6 +-
.../examples/windowing/SessionWindowing.java | 2 +-
.../examples/windowing/StockPrices.java | 4 +-
.../windowing/TopSpeedWindowingExample.java | 2 +-
.../scala/ScalaStreamingAggregator.java | 6 +-
.../api/scala/ConnectedDataStream.scala | 16 +-
.../flink/streaming/api/scala/DataStream.scala | 41 +-
.../api/scala/StreamCrossOperator.scala | 16 +-
.../api/scala/StreamExecutionEnvironment.scala | 4 +-
.../api/scala/StreamJoinOperator.scala | 12 +-
.../streaming/api/scala/TemporalOperator.scala | 4 +-
.../api/scala/WindowedDataStream.scala | 6 +-
.../java/table/JavaStreamingTranslator.scala | 7 +-
.../StreamCheckpointingITCase.java | 4 +-
.../test/classloading/jar/StreamingProgram.java | 2 +-
.../ProcessFailureStreamingRecoveryITCase.java | 6 +-
380 files changed, 15525 insertions(+), 15707 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index a7b0b06..1e645cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 8112159..50f5770 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.flume;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index b1e0f0b..e13ff72 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 376d96f..85c4d35 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -22,7 +22,7 @@ import java.util.Properties;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 7cd8a28..cb89248 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -21,9 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import com.google.common.base.Preconditions;
+
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 53db1c8..fa729d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 740907f..00ec156 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -25,8 +25,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a32fe1b..e500fef 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.twitter;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
import org.apache.flink.util.Collector;
import org.apache.sling.commons.json.JSONException;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index b416839..8205799 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -37,8 +37,8 @@ import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
deleted file mode 100644
index ad9326a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JSONGenerator.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-public class JSONGenerator {
-
- public static final String STEPS = "step_function";
- public static final String ID = "id";
- public static final String SIDE = "side";
- public static final String SHIP_STRATEGY = "ship_strategy";
- public static final String PREDECESSORS = "predecessors";
- public static final String TYPE = "type";
- public static final String PACT = "pact";
- public static final String CONTENTS = "contents";
- public static final String PARALLELISM = "parallelism";
-
- private StreamGraph streamGraph;
-
- public JSONGenerator(StreamGraph streamGraph) {
- this.streamGraph = streamGraph;
- }
-
- public String getJSON() throws JSONException {
- JSONObject json = new JSONObject();
- JSONArray nodes = new JSONArray();
- json.put("nodes", nodes);
- List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
- Collections.sort(operatorIDs);
- visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
- return json.toString();
- }
-
- private void visit(JSONArray jsonArray, List<Integer> toVisit,
- Map<Integer, Integer> edgeRemapings) throws JSONException {
-
- Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getVertex(vertexID);
-
- if (streamGraph.getSourceIDs().contains(vertexID)
- || Collections.disjoint(vertex.getInEdges(), toVisit)) {
-
- JSONObject node = new JSONObject();
- decorateNode(vertexID, node);
-
- if (!streamGraph.getSourceIDs().contains(vertexID)) {
- JSONArray inputs = new JSONArray();
- node.put(PREDECESSORS, inputs);
-
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int inputID = inEdge.getSourceID();
-
- Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
- .get(inputID) : inputID;
- decorateEdge(inputs, vertexID, mappedID, inputID);
- }
- }
- jsonArray.put(node);
- toVisit.remove(vertexID);
- } else {
- Integer iterationHead = -1;
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int operator = inEdge.getSourceID();
-
- if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
- iterationHead = operator;
- }
- }
-
- JSONObject obj = new JSONObject();
- JSONArray iterationSteps = new JSONArray();
- obj.put(STEPS, iterationSteps);
- obj.put(ID, iterationHead);
- obj.put(PACT, "IterativeDataStream");
- obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
- obj.put(CONTENTS, "Stream Iteration");
- JSONArray iterationInputs = new JSONArray();
- obj.put(PREDECESSORS, iterationInputs);
- toVisit.remove(iterationHead);
- visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
- jsonArray.put(obj);
- }
-
- if (!toVisit.isEmpty()) {
- visit(jsonArray, toVisit, edgeRemapings);
- }
- }
-
- private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
- Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
-
- Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getVertex(vertexID);
- toVisit.remove(vertexID);
-
- // Ignoring head and tail to avoid redundancy
- if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) {
- JSONObject obj = new JSONObject();
- jsonArray.put(obj);
- decorateNode(vertexID, obj);
- JSONArray inEdges = new JSONArray();
- obj.put(PREDECESSORS, inEdges);
-
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int inputID = inEdge.getSourceID();
-
- if (edgeRemapings.keySet().contains(inputID)) {
- decorateEdge(inEdges, vertexID, inputID, inputID);
- } else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) {
- decorateEdge(iterationInEdges, vertexID, inputID, inputID);
- }
- }
-
- edgeRemapings.put(vertexID, headId);
- visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
- }
-
- }
-
- private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
- throws JSONException {
- JSONObject input = new JSONObject();
- inputArray.put(input);
- input.put(ID, mappedInputID);
- input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
- .getStrategy());
- input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
- }
-
- private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
-
- StreamNode vertex = streamGraph.getVertex(vertexID);
-
- node.put(ID, vertexID);
- node.put(TYPE, vertex.getOperatorName());
-
- if (streamGraph.getSourceIDs().contains(vertexID)) {
- node.put(PACT, "Data Source");
- } else {
- node.put(PACT, "Data Stream");
- }
-
- StreamInvokable<?, ?> invokable = streamGraph.getVertex(vertexID).getInvokable();
-
- if (invokable != null && invokable.getUserFunction() != null) {
- node.put(CONTENTS, vertex.getOperatorName() + " at "
- + invokable.getUserFunction().getClass().getSimpleName());
- } else {
- node.put(CONTENTS, vertex.getOperatorName());
- }
-
- node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
deleted file mode 100644
index b04e756..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
-import org.apache.flink.util.InstantiationUtil;
-
-public class StreamConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
- private static final String NUMBER_OF_INPUTS = "numberOfInputs";
- private static final String CHAINED_OUTPUTS = "chainedOutputs";
- private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
- private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
- private static final String OUTPUT_NAME = "outputName_";
- private static final String VERTEX_NAME = "vertexID";
- private static final String OPERATOR_NAME = "operatorName";
- private static final String ITERATION_ID = "iterationId";
- private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
- private static final String SERIALIZEDUDF = "serializedUDF";
- private static final String USER_FUNCTION = "userFunction";
- private static final String BUFFER_TIMEOUT = "bufferTimeout";
- private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
- private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
- private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
- private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
- private static final String ITERATON_WAIT = "iterationWait";
- private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
- private static final String EDGES_IN_ORDER = "edgesInOrder";
- private static final String OUT_STREAM_EDGES = "outStreamEdges";
- private static final String IN_STREAM_EDGES = "inStreamEdges";
-
- // DEFAULT VALUES
- private static final long DEFAULT_TIMEOUT = 100;
- public static final String STATE_MONITORING = "STATE_MONITORING";
-
- // CONFIG METHODS
-
- private Configuration config;
-
- public StreamConfig(Configuration config) {
- this.config = config;
- }
-
- public Configuration getConfiguration() {
- return config;
- }
-
- public void setVertexID(Integer vertexID) {
- config.setInteger(VERTEX_NAME, vertexID);
- }
-
- public Integer getVertexID() {
- return config.getInteger(VERTEX_NAME, -1);
- }
-
- public void setOperatorName(String name) {
- config.setString(OPERATOR_NAME, name);
- }
-
- public String getOperatorName() {
- return config.getString(OPERATOR_NAME, "Missing");
- }
-
- public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
- }
-
- public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
- }
-
- public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
- }
-
- public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
- }
-
- @SuppressWarnings("unchecked")
- public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
- try {
- return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_IN_1, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate serializer.");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
- try {
- return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_IN_2, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate serializer.");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
- try {
- return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_OUT_1, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate serializer.");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
- try {
- return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
- TYPE_SERIALIZER_OUT_2, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate serializer.");
- }
- }
-
- private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
- config.setBytes(key, SerializationUtils.serialize(typeWrapper));
- }
-
- public void setBufferTimeout(long timeout) {
- config.setLong(BUFFER_TIMEOUT, timeout);
- }
-
- public long getBufferTimeout() {
- return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
- }
-
- public void setUserInvokable(StreamInvokable<?, ?> invokableObject) {
- if (invokableObject != null) {
- config.setClass(USER_FUNCTION, invokableObject.getClass());
-
- try {
- config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize invokable object "
- + invokableObject.getClass(), e);
- }
- }
- }
-
- @SuppressWarnings({ "unchecked" })
- public <T> T getUserInvokable(ClassLoader cl) {
- try {
- return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
- } catch (Exception e) {
- throw new StreamVertexException("Cannot instantiate user function", e);
- }
- }
-
- public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
- try {
- config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
- try {
- return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
- OUTPUT_SELECTOR_WRAPPER, cl);
- } catch (Exception e) {
- throw new StreamVertexException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
- }
- }
-
- public void setIterationId(Integer iterationId) {
- config.setInteger(ITERATION_ID, iterationId);
- }
-
- public Integer getIterationId() {
- return config.getInteger(ITERATION_ID, 0);
- }
-
- public void setIterationWaitTime(long time) {
- config.setLong(ITERATON_WAIT, time);
- }
-
- public long getIterationWaitTime() {
- return config.getLong(ITERATON_WAIT, 0);
- }
-
- public void setSelectedNames(Integer output, List<String> selected) {
- if (selected != null) {
- config.setBytes(OUTPUT_NAME + output,
- SerializationUtils.serialize((Serializable) selected));
- } else {
- config.setBytes(OUTPUT_NAME + output,
- SerializationUtils.serialize(new ArrayList<String>()));
- }
- }
-
- @SuppressWarnings("unchecked")
- public List<String> getSelectedNames(Integer output) {
- return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
- null));
- }
-
- public void setNumberOfInputs(int numberOfInputs) {
- config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
- }
-
- public int getNumberOfInputs() {
- return config.getInteger(NUMBER_OF_INPUTS, 0);
- }
-
- public void setNumberOfOutputs(int numberOfOutputs) {
- config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
- }
-
- public int getNumberOfOutputs() {
- return config.getInteger(NUMBER_OF_OUTPUTS, 0);
- }
-
- public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
- config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
- }
-
- @SuppressWarnings("unchecked")
- public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
- try {
- return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate outputs.");
- }
- }
-
- public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
- config.setBytes(CHAINED_OUTPUTS,
- SerializationUtils.serialize((Serializable) chainedOutputs));
- }
-
- @SuppressWarnings("unchecked")
- public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
- try {
- return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
- CHAINED_OUTPUTS, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate chained outputs.");
- }
- }
-
- public void setOutEdges(List<StreamEdge> outEdges) {
- config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
- }
-
- @SuppressWarnings("unchecked")
- public List<StreamEdge> getOutEdges(ClassLoader cl) {
- try {
- return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, OUT_STREAM_EDGES, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate outputs.");
- }
- }
-
- public void setInPhysicalEdges(List<StreamEdge> inEdges) {
- config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
- }
-
- @SuppressWarnings("unchecked")
- public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
- try {
- return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, IN_STREAM_EDGES, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate inputs.");
- }
- }
-
- public void setStateMonitoring(boolean stateMonitoring) {
-
- config.setBoolean(STATE_MONITORING, stateMonitoring);
-
- }
-
- public boolean getStateMonitoring()
- {
- return config.getBoolean(STATE_MONITORING, false);
- }
-
- public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
- config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
- }
-
- @SuppressWarnings("unchecked")
- public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
- try {
- return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
- this.config, EDGES_IN_ORDER, cl);
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate outputs.");
- }
- }
-
- public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
- config.setBytes(CHAINED_TASK_CONFIG,
- SerializationUtils.serialize((Serializable) chainedTaskConfigs));
- }
-
- @SuppressWarnings("unchecked")
- public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
- try {
-
- Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
- .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
-
- return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
- } catch (Exception e) {
- throw new RuntimeException("Could not instantiate configuration.");
- }
- }
-
- public void setChainStart() {
- config.setBoolean(IS_CHAINED_VERTEX, true);
- }
-
- public boolean isChainStart() {
- return config.getBoolean(IS_CHAINED_VERTEX, false);
- }
-
- @Override
- public String toString() {
-
- ClassLoader cl = getClass().getClassLoader();
-
- StringBuilder builder = new StringBuilder();
- builder.append("\n=======================");
- builder.append("Stream Config");
- builder.append("=======================");
- builder.append("\nTask name: " + getVertexID());
- builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
- builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
- builder.append("\nOutput names: " + getNonChainedOutputs(cl));
- builder.append("\nPartitioning:");
- for (StreamEdge output : getNonChainedOutputs(cl)) {
- int outputname = output.getTargetID();
- builder.append("\n\t" + outputname + ": " + output.getPartitioner());
- }
-
- builder.append("\nChained subtasks: " + getChainedOutputs(cl));
-
- try {
- builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName());
- } catch (Exception e) {
- builder.append("\nInvokable: Missing");
- }
- builder.append("\nBuffer timeout: " + getBufferTimeout());
- builder.append("\nState Monitoring: " + getStateMonitoring());
- if (isChainStart() && getChainedOutputs(cl).size() > 0) {
- builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
- builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
- }
-
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
deleted file mode 100644
index 066f320..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamEdge.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-
-/**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
-public class StreamEdge implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- final private String edgeId;
-
- final private StreamNode sourceVertex;
- final private StreamNode targetVertex;
-
- /**
- * The type number of the input for co-tasks.
- */
- final private int typeNumber;
-
- /**
- * A list of output names that the target vertex listens to (if there is
- * output selection).
- */
- final private List<String> selectedNames;
- final private StreamPartitioner<?> outputPartitioner;
-
- public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
- List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
- this.sourceVertex = sourceVertex;
- this.targetVertex = targetVertex;
- this.typeNumber = typeNumber;
- this.selectedNames = selectedNames;
- this.outputPartitioner = outputPartitioner;
-
- this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
- + "_" + outputPartitioner;
- }
-
- public StreamNode getSourceVertex() {
- return sourceVertex;
- }
-
- public StreamNode getTargetVertex() {
- return targetVertex;
- }
-
- public int getSourceID() {
- return sourceVertex.getID();
- }
-
- public int getTargetID() {
- return targetVertex.getID();
- }
-
- public int getTypeNumber() {
- return typeNumber;
- }
-
- public List<String> getSelectedNames() {
- return selectedNames;
- }
-
- public StreamPartitioner<?> getPartitioner() {
- return outputPartitioner;
- }
-
- @Override
- public int hashCode() {
- return edgeId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StreamEdge that = (StreamEdge) o;
-
- if (!edgeId.equals(that.edgeId)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public String toString() {
- return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
- + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
- + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
deleted file mode 100644
index 9e4a7e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.api.streamvertex.StreamVertex;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.sling.commons.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class representing the streaming topology. It contains all the information
- * necessary to build the jobgraph for the execution.
- *
- */
-public class StreamGraph extends StreamingPlan {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
- private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
- private String jobName = DEAFULT_JOB_NAME;
-
- private final StreamExecutionEnvironment environemnt;
- private final ExecutionConfig executionConfig;
-
- private boolean checkpointingEnabled = false;
- private long checkpointingInterval = 5000;
- private boolean chaining = true;
-
- private final Map<Integer, StreamNode> streamNodes;
- private final Set<Integer> sources;
-
- private final Map<Integer, StreamLoop> streamLoops;
- protected final Map<Integer, StreamLoop> vertexIDtoLoop;
-
- public StreamGraph(StreamExecutionEnvironment environment) {
-
- this.environemnt = environment;
- executionConfig = environment.getConfig();
-
- streamNodes = new HashMap<Integer, StreamNode>();
- streamLoops = new HashMap<Integer, StreamLoop>();
- vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
- sources = new HashSet<Integer>();
- }
-
- protected ExecutionConfig getExecutionConfig() {
- return executionConfig;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public void setChaining(boolean chaining) {
- this.chaining = chaining;
- }
-
- public void setCheckpointingEnabled(boolean checkpointingEnabled) {
- this.checkpointingEnabled = checkpointingEnabled;
- }
-
- public void setCheckpointingInterval(long checkpointingInterval) {
- this.checkpointingInterval = checkpointingInterval;
- }
-
- public long getCheckpointingInterval() {
- return checkpointingInterval;
- }
-
- public boolean isChainingEnabled() {
- return chaining;
- }
-
- public boolean isCheckpointingEnabled() {
- return checkpointingEnabled;
- }
-
- public boolean isIterative() {
- return !streamLoops.isEmpty();
- }
-
- public <IN, OUT> void addSource(Integer vertexID, StreamInvokable<IN, OUT> invokableObject,
- TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
- addOperator(vertexID, invokableObject, inTypeInfo, outTypeInfo, operatorName);
- sources.add(vertexID);
- }
-
- public <IN, OUT> void addOperator(Integer vertexID, StreamInvokable<IN, OUT> invokableObject,
- TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-
- addNode(vertexID, StreamVertex.class, invokableObject, operatorName);
-
- StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
- inTypeInfo, executionConfig) : null;
-
- StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
- && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
- outTypeInfo, executionConfig) : null;
-
- setSerializers(vertexID, inSerializer, null, outSerializer);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex: {}", vertexID);
- }
- }
-
- public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
- CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
- TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
-
- addNode(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName);
-
- StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
- && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
- outTypeInfo, executionConfig) : null;
-
- setSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
- new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CO-TASK: {}", vertexID);
- }
- }
-
- public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
- long timeOut) {
-
- addNode(vertexID, StreamIterationHead.class, null, null);
-
- chaining = false;
-
- StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
- streamLoops.put(iterationID, iteration);
- vertexIDtoLoop.put(vertexID, iteration);
-
- setSerializersFrom(iterationHead, vertexID);
- getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
-
- int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
- StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
- .get(0).getPartitioner();
-
- addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SOURCE: {}", vertexID);
- }
-
- sources.add(vertexID);
- }
-
- public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
- long waitTime) {
-
- if (getVertex(iterationTail).getBufferTimeout() == 0) {
- throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
- }
-
- addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
- getVertex(iterationTail).getParallelism());
-
- StreamLoop iteration = streamLoops.get(iterationID);
- iteration.setTail(getVertex(iterationTail));
- vertexIDtoLoop.put(vertexID, iteration);
-
- setSerializersFrom(iterationTail, vertexID);
- getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
-
- setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
- setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("ITERATION SINK: {}", vertexID);
- }
-
- }
-
- protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
- StreamInvokable<?, ?> invokableObject, String operatorName) {
-
- StreamNode vertex = new StreamNode(environemnt, vertexID, invokableObject, operatorName,
- new ArrayList<OutputSelector<?>>(), vertexClass);
-
- streamNodes.put(vertexID, vertex);
-
- return vertex;
- }
-
- public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
- StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
-
- StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
- getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
- getVertex(edge.getSourceID()).addOutEdge(edge);
- getVertex(edge.getTargetID()).addInEdge(edge);
- }
-
- public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
- getVertex(vertexID).addOutputSelector(outputSelector);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for {}", vertexID);
- }
-
- }
-
- public void setParallelism(Integer vertexID, int parallelism) {
- getVertex(vertexID).setParallelism(parallelism);
- }
-
- public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
- getVertex(vertexID).setBufferTimeout(bufferTimeout);
- }
-
- private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
- StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
- StreamNode vertex = getVertex(vertexID);
- vertex.setSerializerIn1(in1);
- vertex.setSerializerIn2(in2);
- vertex.setSerializerOut(out);
- }
-
- private void setSerializersFrom(Integer from, Integer to) {
- StreamNode fromVertex = getVertex(from);
- StreamNode toVertex = getVertex(to);
-
- toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
- toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
- }
-
- public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
- StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
- executionConfig);
- getVertex(vertexID).setSerializerOut(serializer);
- }
-
- public <IN, OUT> void setInvokable(Integer vertexID, StreamInvokable<IN, OUT> invokableObject) {
- getVertex(vertexID).setInvokable(invokableObject);
- }
-
- public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
- getVertex(vertexID).setInputFormat(inputFormat);
- }
-
- public StreamNode getVertex(Integer vertexID) {
- return streamNodes.get(vertexID);
- }
-
- protected Collection<? extends Integer> getVertexIDs() {
- return streamNodes.keySet();
- }
-
- protected StreamEdge getEdge(int sourceId, int targetId) {
- Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator();
- while (outIterator.hasNext()) {
- StreamEdge edge = outIterator.next();
-
- if (edge.getTargetID() == targetId) {
- return edge;
- }
- }
-
- throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
- }
-
- public Collection<Integer> getSourceIDs() {
- return sources;
- }
-
- public Set<Tuple2<Integer, StreamInvokable<?, ?>>> getInvokables() {
- Set<Tuple2<Integer, StreamInvokable<?, ?>>> invokableSet = new HashSet<Tuple2<Integer, StreamInvokable<?, ?>>>();
- for (StreamNode vertex : streamNodes.values()) {
- invokableSet.add(new Tuple2<Integer, StreamInvokable<?, ?>>(vertex.getID(), vertex
- .getInvokable()));
- }
- return invokableSet;
- }
-
- public Collection<StreamLoop> getStreamLoops() {
- return streamLoops.values();
- }
-
- public Integer getLoopID(Integer vertexID) {
- return vertexIDtoLoop.get(vertexID).getID();
- }
-
- public long getLoopTimeout(Integer vertexID) {
- return vertexIDtoLoop.get(vertexID).getTimeout();
- }
-
- protected void removeEdge(StreamEdge edge) {
-
- edge.getSourceVertex().getOutEdges().remove(edge);
- edge.getTargetVertex().getInEdges().remove(edge);
-
- }
-
- protected void removeVertex(StreamNode toRemove) {
-
- Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
-
- edgesToRemove.addAll(toRemove.getInEdges());
- edgesToRemove.addAll(toRemove.getOutEdges());
-
- for (StreamEdge edge : edgesToRemove) {
- removeEdge(edge);
- }
- streamNodes.remove(toRemove.getID());
- }
-
- /**
- * Gets the assembled {@link JobGraph} and adds a default name for it.
- */
- public JobGraph getJobGraph() {
- return getJobGraph(jobName);
- }
-
- /**
- * Gets the assembled {@link JobGraph} and adds a user specified name for
- * it.
- *
- * @param jobGraphName
- * name of the jobGraph
- */
- public JobGraph getJobGraph(String jobGraphName) {
-
- // temporarily forbid checkpointing for iterative jobs
- if (isIterative() && isCheckpointingEnabled()) {
- throw new UnsupportedOperationException(
- "Checkpointing is currently not supported for iterative jobs!");
- }
-
- setJobName(jobGraphName);
-
- WindowingOptimizer.optimizeGraph(this);
-
- StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
-
- return jobgraphGenerator.createJobGraph(jobGraphName);
- }
-
- @Override
- public String getStreamingPlanAsJSON() {
-
- WindowingOptimizer.optimizeGraph(this);
-
- try {
- return new JSONGenerator(this).getJSON();
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("JSON plan creation failed: {}", e);
- }
- return "";
- }
-
- }
-
- @Override
- public void dumpStreamingPlanAsJSON(File file) throws IOException {
- PrintWriter pw = null;
- try {
- pw = new PrintWriter(new FileOutputStream(file), false);
- pw.write(getStreamingPlanAsJSON());
- pw.flush();
-
- } finally {
- if (pw != null) {
- pw.close();
- }
- }
- }
-
- /**
- * Object for representing loops in streaming programs.
- *
- */
- protected static class StreamLoop {
-
- private Integer loopID;
-
- private StreamNode head;
- private StreamNode tail;
-
- private Long timeout;
-
- public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
- this.loopID = loopID;
- this.head = head;
- this.timeout = timeout;
- }
-
- public Integer getID() {
- return loopID;
- }
-
- public Long getTimeout() {
- return timeout;
- }
-
- public void setTail(StreamNode tail) {
- this.tail = tail;
- }
-
- public StreamNode getHead() {
- return head;
- }
-
- public StreamNode getTail() {
- return tail;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
deleted file mode 100644
index 86b4782..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamNode.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-
-/**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- *
- */
-public class StreamNode implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- transient private StreamExecutionEnvironment env;
-
- private Integer ID;
- private Integer parallelism = null;
- private Long bufferTimeout = null;
- private String operatorName;
-
- private StreamInvokable<?, ?> invokable;
- private List<OutputSelector<?>> outputSelectors;
- private StreamRecordSerializer<?> typeSerializerIn1;
- private StreamRecordSerializer<?> typeSerializerIn2;
- private StreamRecordSerializer<?> typeSerializerOut;
-
- private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
- private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
-
- private Class<? extends AbstractInvokable> jobVertexClass;
-
- private InputFormat<String, ?> inputFormat;
-
- public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamInvokable<?, ?> invokable,
- String operatorName, List<OutputSelector<?>> outputSelector,
- Class<? extends AbstractInvokable> jobVertexClass) {
- this.env = env;
- this.ID = ID;
- this.operatorName = operatorName;
- this.invokable = invokable;
- this.outputSelectors = outputSelector;
- this.jobVertexClass = jobVertexClass;
- }
-
- public void addInEdge(StreamEdge inEdge) {
- if (inEdge.getTargetID() != getID()) {
- throw new IllegalArgumentException("Destination ID doesn't match the StreamNode ID");
- } else {
- inEdges.add(inEdge);
- }
- }
-
- public void addOutEdge(StreamEdge outEdge) {
- if (outEdge.getSourceID() != getID()) {
- throw new IllegalArgumentException("Source ID doesn't match the StreamNode ID");
- } else {
- outEdges.add(outEdge);
- }
- }
-
- public List<StreamEdge> getOutEdges() {
- return outEdges;
- }
-
- public List<StreamEdge> getInEdges() {
- return inEdges;
- }
-
- public List<Integer> getOutEdgeIndices() {
- List<Integer> outEdgeIndices = new ArrayList<Integer>();
-
- for (StreamEdge edge : outEdges) {
- outEdgeIndices.add(edge.getTargetID());
- }
-
- return outEdgeIndices;
- }
-
- public List<Integer> getInEdgeIndices() {
- List<Integer> inEdgeIndices = new ArrayList<Integer>();
-
- for (StreamEdge edge : inEdges) {
- inEdgeIndices.add(edge.getSourceID());
- }
-
- return inEdgeIndices;
- }
-
- public Integer getID() {
- return ID;
- }
-
- public Integer getParallelism() {
- return parallelism != null ? parallelism : env.getParallelism();
- }
-
- public void setParallelism(Integer parallelism) {
- this.parallelism = parallelism;
- }
-
- public Long getBufferTimeout() {
- return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
- }
-
- public void setBufferTimeout(Long bufferTimeout) {
- this.bufferTimeout = bufferTimeout;
- }
-
- public StreamInvokable<?, ?> getInvokable() {
- return invokable;
- }
-
- public void setInvokable(StreamInvokable<?, ?> invokable) {
- this.invokable = invokable;
- }
-
- public String getOperatorName() {
- return operatorName;
- }
-
- public void setOperatorName(String operatorName) {
- this.operatorName = operatorName;
- }
-
- public List<OutputSelector<?>> getOutputSelectors() {
- return outputSelectors;
- }
-
- public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
- return OutputSelectorWrapperFactory.create(getOutputSelectors());
- }
-
- public void addOutputSelector(OutputSelector<?> outputSelector) {
- this.outputSelectors.add(outputSelector);
- }
-
- public StreamRecordSerializer<?> getTypeSerializerIn1() {
- return typeSerializerIn1;
- }
-
- public void setSerializerIn1(StreamRecordSerializer<?> typeSerializerIn1) {
- this.typeSerializerIn1 = typeSerializerIn1;
- }
-
- public StreamRecordSerializer<?> getTypeSerializerIn2() {
- return typeSerializerIn2;
- }
-
- public void setSerializerIn2(StreamRecordSerializer<?> typeSerializerIn2) {
- this.typeSerializerIn2 = typeSerializerIn2;
- }
-
- public StreamRecordSerializer<?> getTypeSerializerOut() {
- return typeSerializerOut;
- }
-
- public void setSerializerOut(StreamRecordSerializer<?> typeSerializerOut) {
- this.typeSerializerOut = typeSerializerOut;
- }
-
- public Class<? extends AbstractInvokable> getJobVertexClass() {
- return jobVertexClass;
- }
-
- public InputFormat<String, ?> getInputFormat() {
- return inputFormat;
- }
-
- public void setInputFormat(InputFormat<String, ?> inputFormat) {
- this.inputFormat = inputFormat;
- }
-
-}