You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:31:00 UTC

[25/30] git commit: Offer buffer-oriented API for I/O (#25)

Offer buffer-oriented API for I/O (#25)

https://github.com/stratosphere/stratosphere/issues/25

The runtime offered a record-oriented API for data transfers, which
* resulted in unnecessary data (de)serialization,
* complicated the upcoming fault tolerance implementation, and
* blocked more efficient implementations of higher-level operators.

With this commit, the runtime offers a buffer-oriented API for the
output side (sending), which is oblivious to records. The buffer
oriented input side (receiving) is still to be implemented.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2db78a8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2db78a8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2db78a8d

Branch: refs/heads/master
Commit: 2db78a8dc1a4664f3e384005d7e07bea594b835b
Parents: 0bb812e
Author: uce <u....@fu-berlin.de>
Authored: Tue Jan 14 17:45:21 2014 +0100
Committer: StephanEwen <st...@tu-berlin.de>
Committed: Sat Jun 7 09:41:21 2014 +0200

----------------------------------------------------------------------
 .../spargel/java/record/SpargelIteration.java   |   3 +-
 .../plantranslate/NepheleJobGraphGenerator.java |  13 +-
 .../configuration/ConfigConstants.java          |   6 -
 .../stratosphere/core/memory/MemorySegment.java |   5 +
 .../main/java/eu/stratosphere/types/Record.java |   2 +-
 .../eu/stratosphere/nephele/AbstractID.java     | 177 ++++
 .../nephele/annotations/ForceCheckpoint.java    |  30 -
 .../nephele/annotations/Stateful.java           |  28 -
 .../nephele/annotations/Stateless.java          |  28 -
 .../nephele/annotations/TaskAnnotation.java     |  29 -
 .../deployment/ChannelDeploymentDescriptor.java |   2 +-
 .../deployment/GateDeploymentDescriptor.java    |   4 +-
 .../nephele/example/events/EventExample.java    |  95 ---
 .../nephele/example/events/EventReceiver.java   |  49 --
 .../nephele/example/events/EventSender.java     |  68 --
 .../nephele/example/events/MyEventListener.java |  46 --
 .../stratosphere/nephele/example/grep/Grep.java | 108 ---
 .../nephele/example/grep/GrepTask.java          |  47 --
 .../nephele/example/speedtest/SpeedTest.java    | 224 -----
 .../example/speedtest/SpeedTestConsumer.java    |  45 -
 .../example/speedtest/SpeedTestForwarder.java   |  54 --
 .../example/speedtest/SpeedTestProducer.java    |  54 --
 .../example/speedtest/SpeedTestRecord.java      |  55 --
 .../nephele/example/union/ConsumerTask.java     |  44 -
 .../nephele/example/union/ProducerTask.java     |  43 -
 .../nephele/example/union/UnionJob.java         | 104 ---
 .../nephele/example/union/UnionTask.java        |  45 -
 .../nephele/execution/CancelTaskException.java  |  25 +
 .../nephele/execution/Environment.java          |  82 +-
 .../execution/ExecutionStateTransition.java     |  24 +-
 .../nephele/execution/RuntimeEnvironment.java   | 326 +++-----
 .../DistributionPatternProvider.java            |   2 +-
 .../nephele/executiongraph/ExecutionEdge.java   |  11 +-
 .../nephele/executiongraph/ExecutionGate.java   |   4 +-
 .../nephele/executiongraph/ExecutionGraph.java  |  20 +-
 .../executiongraph/ExecutionGroupEdge.java      |  28 +-
 .../executiongraph/ExecutionGroupVertex.java    |  27 +-
 .../nephele/executiongraph/ExecutionStage.java  |   6 +-
 .../nephele/executiongraph/ExecutionVertex.java |  38 +-
 .../executiongraph/ExecutionVertexID.java       |   2 +-
 .../executiongraph/ManagementGraphFactory.java  |   2 +-
 .../nephele/instance/AbstractInstance.java      |  23 +-
 .../nephele/instance/AllocationID.java          |   2 +-
 .../instance/InstanceConnectionInfo.java        |  25 +-
 .../nephele/instance/InstanceID.java            |   2 +-
 .../instance/cluster/ClusterManager.java        |  10 +-
 .../stratosphere/nephele/io/AbstractGate.java   | 148 ----
 .../eu/stratosphere/nephele/io/AbstractID.java  | 214 -----
 .../nephele/io/AbstractRecordReader.java        |  98 ---
 .../nephele/io/AbstractRecordWriter.java        | 140 ----
 .../io/AbstractSingleGateRecordReader.java      |  77 --
 .../nephele/io/AbstractUnionRecordReader.java   | 152 ----
 .../nephele/io/BroadcastRecordWriter.java       |  53 --
 .../nephele/io/ChannelSelector.java             |  39 -
 .../nephele/io/DataOutputBuffer.java            | 165 ----
 .../nephele/io/DefaultChannelSelector.java      |  47 --
 .../nephele/io/DistributionPattern.java         |  33 -
 .../java/eu/stratosphere/nephele/io/Gate.java   | 126 ---
 .../java/eu/stratosphere/nephele/io/GateID.java |  22 -
 .../io/ImmutableRecordDeserializerFactory.java  |  41 -
 .../nephele/io/InputChannelResult.java          |  23 -
 .../eu/stratosphere/nephele/io/InputGate.java   | 136 ----
 .../stratosphere/nephele/io/MutableReader.java  |  32 -
 .../io/MutableRecordDeserializerFactory.java    |  52 --
 .../nephele/io/MutableRecordReader.java         | 119 ---
 .../nephele/io/MutableUnionRecordReader.java    |  37 -
 .../eu/stratosphere/nephele/io/OutputGate.java  | 149 ----
 .../java/eu/stratosphere/nephele/io/Reader.java |  30 -
 .../eu/stratosphere/nephele/io/ReaderBase.java  |  67 --
 .../nephele/io/RecordAvailabilityListener.java  |  35 -
 .../nephele/io/RecordDeserializer.java          |  55 --
 .../nephele/io/RecordDeserializerFactory.java   |  31 -
 .../stratosphere/nephele/io/RecordReader.java   | 153 ----
 .../stratosphere/nephele/io/RecordWriter.java   |  82 --
 .../nephele/io/RuntimeInputGate.java            | 330 --------
 .../nephele/io/RuntimeOutputGate.java           | 333 --------
 .../nephele/io/UnionRecordReader.java           |  67 --
 .../java/eu/stratosphere/nephele/io/Writer.java |  28 -
 .../nephele/io/channels/AbstractChannel.java    | 127 ---
 .../io/channels/AbstractInputChannel.java       | 102 ---
 .../io/channels/AbstractOutputChannel.java      | 111 ---
 .../nephele/io/channels/Buffer.java             | 175 ----
 .../nephele/io/channels/BufferFactory.java      |  32 -
 .../nephele/io/channels/ChannelID.java          |  30 -
 .../nephele/io/channels/ChannelType.java        |  31 -
 .../io/channels/ChannelWithAccessInfo.java      |  58 --
 .../io/channels/ChannelWithPosition.java        |  42 -
 .../io/channels/DefaultDeserializer.java        | 781 ------------------
 .../DistributedChannelWithAccessInfo.java       | 176 ----
 .../io/channels/LocalChannelWithAccessInfo.java | 170 ----
 .../nephele/io/channels/MemoryBuffer.java       | 249 ------
 .../io/channels/MemoryBufferPoolConnector.java  |  32 -
 .../io/channels/MemoryBufferRecycler.java       |  91 ---
 .../io/channels/SerializationBuffer.java        | 141 ----
 .../AbstractByteBufferedInputChannel.java       | 243 ------
 .../AbstractByteBufferedOutputChannel.java      | 255 ------
 .../io/channels/bytebuffered/BufferOrEvent.java |  52 --
 .../ByteBufferedChannelCloseEvent.java          |  35 -
 .../ByteBufferedInputChannelBroker.java         |  36 -
 .../ByteBufferedOutputChannelBroker.java        |  71 --
 .../bytebuffered/EndOfSuperstepEvent.java       |  34 -
 .../bytebuffered/InMemoryInputChannel.java      |  34 -
 .../bytebuffered/InMemoryOutputChannel.java     |  34 -
 .../bytebuffered/NetworkInputChannel.java       |  34 -
 .../bytebuffered/NetworkOutputChannel.java      |  34 -
 .../nephele/io/library/DirectoryReader.java     | 103 ---
 .../nephele/io/library/DirectoryWriter.java     |  67 --
 .../nephele/io/library/FileLineReader.java      |  76 --
 .../nephele/io/library/FileLineWriter.java      |  75 --
 .../eu/stratosphere/nephele/ipc/Client.java     |  43 +-
 .../nephele/jobgraph/AbstractJobVertex.java     |   3 +-
 .../nephele/jobgraph/DistributionPattern.java   |  33 +
 .../stratosphere/nephele/jobgraph/JobEdge.java  |   3 +-
 .../eu/stratosphere/nephele/jobgraph/JobID.java |  69 +-
 .../nephele/jobgraph/JobVertexID.java           |   2 +-
 .../nephele/jobmanager/JobManager.java          | 147 +---
 .../jobmanager/scheduler/AbstractScheduler.java |   2 +-
 .../jobmanager/scheduler/RecoveryLogic.java     |   3 +-
 .../nephele/managementgraph/ManagementEdge.java |   2 +-
 .../managementgraph/ManagementEdgeID.java       |   4 +-
 .../managementgraph/ManagementGateID.java       |   2 +-
 .../managementgraph/ManagementGraph.java        |   2 +-
 .../managementgraph/ManagementGroupEdge.java    |   2 +-
 .../managementgraph/ManagementGroupVertex.java  |   2 +-
 .../ManagementGroupVertexID.java                |   2 +-
 .../managementgraph/ManagementVertexID.java     |   2 +-
 .../nephele/multicast/MulticastCluster.java     | 210 -----
 .../multicast/MulticastForwardingTable.java     |  49 --
 .../nephele/multicast/MulticastManager.java     | 463 -----------
 .../multicast/TopologyInformationSupplier.java  |  30 -
 .../nephele/multicast/TreeNode.java             | 246 ------
 .../nephele/profiling/TaskManagerProfiler.java  |   5 +-
 .../profiling/impl/TaskManagerProfilerImpl.java |  31 +-
 .../protocols/ChannelLookupProtocol.java        |   4 +-
 .../protocols/ExtendedManagementProtocol.java   |  13 -
 .../protocols/TaskOperationProtocol.java        |  14 +-
 .../services/iomanager/BlockChannelAccess.java  |   4 +-
 .../services/memorymanager/MemoryManager.java   |   3 +-
 .../memorymanager/spi/DefaultMemoryManager.java |  17 +-
 .../taskmanager/ExecutorThreadFactory.java      |  35 +
 .../stratosphere/nephele/taskmanager/Task.java  | 257 +++++-
 .../nephele/taskmanager/TaskKillResult.java     |  44 -
 .../nephele/taskmanager/TaskManager.java        |  56 +-
 .../BufferAvailabilityListener.java             |  28 -
 .../bufferprovider/BufferProvider.java          |  82 --
 .../bufferprovider/BufferProviderBroker.java    |  24 -
 .../bufferprovider/GlobalBufferPool.java        | 135 ---
 .../bufferprovider/LocalBufferPool.java         | 287 -------
 .../bufferprovider/LocalBufferPoolOwner.java    |  54 --
 .../AbstractOutputChannelContext.java           |  84 --
 .../AbstractOutputChannelForwarder.java         | 109 ---
 .../ByteBufferedChannelManager.java             | 816 -------------------
 .../bytebuffered/CanceledChannelSet.java        | 211 -----
 .../bytebuffered/ChannelContext.java            |  36 -
 .../ConnectionInfoLookupResponse.java           | 176 ----
 .../taskmanager/bytebuffered/GateContext.java   |  21 -
 .../bytebuffered/IncomingConnection.java        | 125 ---
 .../bytebuffered/IncomingConnectionThread.java  | 223 -----
 .../bytebuffered/InputChannelContext.java       |  21 -
 .../bytebuffered/InputGateContext.java          |  24 -
 .../InsufficientResourcesException.java         |  37 -
 .../bytebuffered/NetworkConnectionManager.java  | 173 ----
 .../bytebuffered/OutgoingConnection.java        | 531 ------------
 .../bytebuffered/OutgoingConnectionThread.java  | 270 ------
 .../bytebuffered/OutputChannelContext.java      |  17 -
 .../OutputChannelForwardingChain.java           |  84 --
 .../bytebuffered/OutputGateContext.java         |  22 -
 .../bytebuffered/ReceiverNotFoundEvent.java     | 169 ----
 .../bytebuffered/RemoteReceiver.java            | 157 ----
 .../bytebuffered/SenderHintEvent.java           | 119 ---
 .../taskmanager/bytebuffered/TaskContext.java   |  24 -
 .../bytebuffered/UnexpectedEnvelopeEvent.java   |  81 --
 .../runtime/ExecutorThreadFactory.java          |  35 -
 .../taskmanager/runtime/ForwardingBarrier.java  |  75 --
 .../taskmanager/runtime/RuntimeDispatcher.java  |  38 -
 .../runtime/RuntimeInputChannelContext.java     | 303 -------
 .../runtime/RuntimeInputGateContext.java        | 183 -----
 .../runtime/RuntimeOutputChannelBroker.java     | 206 -----
 .../runtime/RuntimeOutputChannelContext.java    |  76 --
 .../runtime/RuntimeOutputGateContext.java       | 159 ----
 .../taskmanager/runtime/RuntimeTask.java        | 346 --------
 .../taskmanager/runtime/RuntimeTaskContext.java | 211 -----
 .../transferenvelope/AbstractDeserializer.java  | 355 --------
 .../transferenvelope/AbstractSerializer.java    | 274 -------
 .../CapacityConstrainedArrayQueue.java          | 322 --------
 .../transferenvelope/DefaultDeserializer.java   |  94 ---
 .../transferenvelope/DefaultSerializer.java     |  38 -
 .../NoBufferAvailableException.java             |  53 --
 .../transferenvelope/TransferEnvelope.java      | 165 ----
 .../TransferEnvelopeDispatcher.java             |  37 -
 .../TransferEnvelopeReceiverList.java           |  86 --
 .../nephele/util/BufferPoolConnector.java       |  45 +
 .../eu/stratosphere/nephele/util/TaskUtils.java |  42 -
 .../runtime/iterative/io/FakeOutputTask.java    |   2 +-
 .../iterative/task/IterationHeadPactTask.java   |  16 +-
 .../task/IterationIntermediatePactTask.java     |   7 +-
 .../task/IterationSynchronizationSinkTask.java  |   2 +-
 .../iterative/task/IterationTailPactTask.java   |   1 +
 .../pact/runtime/shipping/OutputCollector.java  |  31 +-
 .../pact/runtime/shipping/OutputEmitter.java    |   2 +-
 .../runtime/shipping/RecordOutputCollector.java |  48 +-
 .../runtime/shipping/RecordOutputEmitter.java   |   2 +
 .../pact/runtime/task/DataSinkTask.java         |  19 +-
 .../pact/runtime/task/DataSourceTask.java       |  24 +-
 .../pact/runtime/task/RegularPactTask.java      | 250 +++---
 .../ExceptionInChainedStubException.java        |   8 +
 .../SynchronousChainedCombineDriver.java        |   2 +
 .../pact/runtime/task/util/ReaderIterator.java  |   2 +-
 .../runtime/task/util/RecordReaderIterator.java |   2 +-
 .../java/eu/stratosphere/runtime/io/Buffer.java |  93 +++
 .../stratosphere/runtime/io/BufferRecycler.java |  26 +
 .../runtime/io/api/AbstractRecordReader.java    |  98 +++
 .../io/api/AbstractSingleGateRecordReader.java  |  69 ++
 .../io/api/AbstractUnionRecordReader.java       | 155 ++++
 .../runtime/io/api/BufferWriter.java            |  67 ++
 .../runtime/io/api/ChannelSelector.java         |  39 +
 .../runtime/io/api/MutableReader.java           |  32 +
 .../runtime/io/api/MutableRecordReader.java     | 120 +++
 .../io/api/MutableUnionRecordReader.java        |  37 +
 .../eu/stratosphere/runtime/io/api/Reader.java  |  30 +
 .../stratosphere/runtime/io/api/ReaderBase.java |  67 ++
 .../runtime/io/api/RecordReader.java            | 154 ++++
 .../runtime/io/api/RecordWriter.java            | 151 ++++
 .../io/api/RoundRobinChannelSelector.java       |  47 ++
 .../runtime/io/api/UnionRecordReader.java       |  67 ++
 .../runtime/io/channels/BufferOrEvent.java      |  52 ++
 .../runtime/io/channels/Channel.java            |  97 +++
 .../runtime/io/channels/ChannelCloseEvent.java  |  33 +
 .../runtime/io/channels/ChannelID.java          |  39 +
 .../runtime/io/channels/ChannelType.java        |  26 +
 .../io/channels/EndOfSuperstepEvent.java        |  34 +
 .../runtime/io/channels/InputChannel.java       | 493 +++++++++++
 .../runtime/io/channels/OutputChannel.java      | 193 +++++
 .../ReceiverAlreadyClosedException.java         |  22 +
 .../eu/stratosphere/runtime/io/gates/Gate.java  | 174 ++++
 .../stratosphere/runtime/io/gates/GateID.java   |  24 +
 .../runtime/io/gates/InputChannelResult.java    |  23 +
 .../runtime/io/gates/InputGate.java             | 384 +++++++++
 .../runtime/io/gates/OutputGate.java            | 165 ++++
 .../io/gates/RecordAvailabilityListener.java    |  36 +
 .../runtime/io/network/ChannelManager.java      | 646 +++++++++++++++
 .../network/ConnectionInfoLookupResponse.java   | 143 ++++
 .../network/InsufficientResourcesException.java |  37 +
 .../LocalReceiverCancelledException.java        |  37 +
 .../io/network/NetworkConnectionManager.java    | 176 ++++
 .../runtime/io/network/RemoteReceiver.java      | 157 ++++
 .../runtime/io/network/SenderHintEvent.java     | 117 +++
 .../BufferAvailabilityListener.java             |  28 +
 .../network/bufferprovider/BufferProvider.java  |  69 ++
 .../bufferprovider/BufferProviderBroker.java    |  24 +
 .../bufferprovider/GlobalBufferPool.java        | 123 +++
 .../network/bufferprovider/LocalBufferPool.java | 306 +++++++
 .../bufferprovider/LocalBufferPoolOwner.java    |  56 ++
 .../bufferprovider/SerialSingleBufferPool.java  |  77 ++
 .../runtime/io/network/envelope/Envelope.java   | 169 ++++
 .../io/network/envelope/EnvelopeDispatcher.java |  46 ++
 .../io/network/envelope/EnvelopeReader.java     | 212 +++++
 .../network/envelope/EnvelopeReceiverList.java  |  75 ++
 .../io/network/envelope/EnvelopeWriter.java     | 134 +++
 .../envelope/NoBufferAvailableException.java    |  53 ++
 .../io/network/tcp/IncomingConnection.java      | 115 +++
 .../network/tcp/IncomingConnectionThread.java   | 226 +++++
 .../io/network/tcp/OutgoingConnection.java      | 529 ++++++++++++
 .../network/tcp/OutgoingConnectionThread.java   | 276 +++++++
 .../AdaptiveSpanningRecordDeserializer.java     | 521 ++++++++++++
 .../io/serialization/DataInputDeserializer.java | 307 +++++++
 .../io/serialization/DataOutputSerializer.java  | 259 ++++++
 .../io/serialization/RecordDeserializer.java    |  56 ++
 .../io/serialization/RecordSerializer.java      |  60 ++
 .../serialization/SpanningRecordSerializer.java | 153 ++++
 .../eu/stratosphere/nephele/AbstractIDTest.java |  62 ++
 .../ChannelDeploymentDescriptorTest.java        |   2 +-
 .../GateDeploymentDescriptorTest.java           |  12 +-
 .../TaskDeploymentDescriptorTest.java           |   2 +-
 .../executiongraph/ExecutionGraphTest.java      |  30 +-
 .../ForwardTask1Input1Output.java               |   9 +-
 .../ForwardTask1Input2Outputs.java              |  14 +-
 .../ForwardTask2Inputs1Output.java              |   9 +-
 .../executiongraph/SelfCrossForwardTask.java    |   6 +-
 .../executiongraph/SelfCrossInputTask.java      |   6 +-
 .../stratosphere/nephele/fs/LineReaderTest.java |  79 --
 .../nephele/fs/s3/S3FileSystemTest.java         | 461 -----------
 .../stratosphere/nephele/io/AbstractIDTest.java |  62 --
 .../nephele/io/DefaultChannelSelectorTest.java  |  46 --
 .../nephele/io/channels/BufferTest.java         | 192 -----
 .../nephele/io/channels/MemoryBufferTest.java   | 155 ----
 .../io/channels/serialization/BooleanType.java  |  77 --
 .../channels/serialization/ByteArrayType.java   |  88 --
 .../serialization/ByteSubArrayType.java         |  98 ---
 .../io/channels/serialization/ByteType.java     |  77 --
 .../io/channels/serialization/CharType.java     |  78 --
 .../serialization/DeSerializerTest.java         | 299 -------
 .../io/channels/serialization/DoubleType.java   |  78 --
 .../io/channels/serialization/FloatType.java    |  77 --
 .../io/channels/serialization/IntType.java      |  77 --
 .../io/channels/serialization/LongType.java     |  77 --
 .../serialization/SerializationTestType.java    |  52 --
 .../io/channels/serialization/ShortType.java    |  77 --
 .../channels/serialization/UTFStringType.java   |  86 --
 .../serialization/UnsignedByteType.java         |  77 --
 .../serialization/UnsignedShortType.java        |  77 --
 .../io/library/FileLineReadWriteTest.java       | 134 ---
 .../nephele/jobmanager/DoubleSourceTask.java    |  11 +-
 .../nephele/jobmanager/DoubleTargetTask.java    |  10 +-
 .../nephele/jobmanager/ExceptionTask.java       |   6 +-
 .../nephele/jobmanager/ForwardTask.java         |  10 +-
 .../nephele/jobmanager/JobManagerITCase.java    |  76 +-
 .../nephele/jobmanager/UnionTask.java           |  12 +-
 .../scheduler/queue/QueueSchedulerTest.java     |  10 +-
 .../managementgraph/ManagementGraphTest.java    |   6 +-
 .../profiling/impl/InstanceProfilerTest.java    |   2 +-
 .../DefaultDeserializerTest.java                | 358 --------
 .../transferenvelope/DefaultSerializerTest.java | 313 -------
 .../nephele/util/BufferPoolConnector.java       |  53 --
 .../nephele/util/DiscardingRecycler.java        |  24 +
 .../nephele/util/FileLineReader.java            |  80 ++
 .../nephele/util/FileLineWriter.java            |  75 ++
 .../nephele/util/TestBufferProvider.java        |  76 ++
 .../pact/runtime/task/DataSinkTaskTest.java     |  36 +-
 .../pact/runtime/task/DataSourceTaskTest.java   |  28 +-
 .../runtime/task/chaining/ChainTaskTest.java    |  11 +-
 .../runtime/task/util/OutputEmitterTest.java    |   1 +
 .../task/util/RecordOutputEmitterTest.java      |   2 +
 .../pact/runtime/test/util/MockEnvironment.java | 169 ++--
 .../pact/runtime/test/util/TaskTestBase.java    |   4 +-
 .../stratosphere/runtime/fs/LineReaderTest.java |  78 ++
 .../runtime/fs/s3/S3FileSystemTest.java         | 460 +++++++++++
 .../runtime/io/DefaultChannelSelectorTest.java  |  47 ++
 .../io/library/FileLineReadWriteTest.java       | 136 ++++
 .../envelope/EnvelopeReaderWriterTest.java      | 394 +++++++++
 .../DataInputOutputSerializerTest.java          | 115 +++
 .../io/serialization/PagedViewsTest.java        | 160 ++++
 .../SpanningRecordSerializationTest.java        | 164 ++++
 .../SpanningRecordSerializerTest.java           | 219 +++++
 .../io/serialization/types/AsciiStringType.java |  77 ++
 .../io/serialization/types/BooleanType.java     |  67 ++
 .../io/serialization/types/ByteArrayType.java   |  76 ++
 .../serialization/types/ByteSubArrayType.java   |  91 +++
 .../io/serialization/types/ByteType.java        |  67 ++
 .../io/serialization/types/CharType.java        |  68 ++
 .../io/serialization/types/DoubleType.java      |  68 ++
 .../io/serialization/types/FloatType.java       |  67 ++
 .../runtime/io/serialization/types/IntType.java |  67 ++
 .../io/serialization/types/LongType.java        |  67 ++
 .../types/SerializationTestType.java            |  26 +
 .../types/SerializationTestTypeFactory.java     |  40 +
 .../io/serialization/types/ShortType.java       |  67 ++
 .../serialization/types/UnsignedByteType.java   |  67 ++
 .../serialization/types/UnsignedShortType.java  |  67 ++
 .../runtime/io/serialization/types/Util.java    |  90 ++
 .../BroadcastVarsNepheleITCase.java             |   4 +-
 .../KMeansIterativeNepheleITCase.java           |   3 +-
 .../test/iterative/nephele/JobGraphUtils.java   |   4 +-
 .../CustomCompensatableDanglingPageRank.java    |  10 +-
 ...mpensatableDanglingPageRankWithCombiner.java |  10 +-
 .../CompensatableDanglingPageRank.java          |  10 +-
 .../query1Util/LineItemFilterTest.java          |   6 +-
 357 files changed, 12871 insertions(+), 20521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
index 74bcaf3..14dcb35 100644
--- a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
+++ b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
@@ -155,6 +155,7 @@ public class SpargelIteration {
 
 		@Override
 		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
+
 			if (vertex.hasNext()) {
 				Record first = vertex.next();
 				first.getFieldInto(0, vertexKey);
@@ -270,4 +271,4 @@ public class SpargelIteration {
 			this.messagingFunction.postSuperstep();
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 80ae308..53b4cc1 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -52,8 +52,9 @@ import eu.stratosphere.compiler.plan.WorksetPlanNode;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.AbstractJobOutputVertex;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
@@ -1224,7 +1225,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			
 			// connect the fake tail
 			try {
-				rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+				rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 			} catch (JobGraphDefinitionException e) {
 				throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
 			}
@@ -1267,7 +1268,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		
 			// connect the fake tail
 			try {
-				rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+				rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 			} catch (JobGraphDefinitionException e) {
 				throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task for termination criterion");
 			}
@@ -1401,7 +1402,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					
 					// connect the fake tail
 					try {
-						nextWorksetVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+						nextWorksetVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 					} catch (JobGraphDefinitionException e) {
 						throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
 					}
@@ -1440,7 +1441,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					
 					// connect the fake tail
 					try {
-						solutionDeltaVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+						solutionDeltaVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 					} catch (JobGraphDefinitionException e) {
 						throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
 					}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index 51c0a2f..3b9ba3d 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -344,12 +344,6 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
 	
-	/**
-	 * The config parameter defining whether to use the special multicast logic
-	 * for broadcasts.
-	 */
-	public static final boolean DEFAULT_USE_MULTICAST_FOR_BROADCAST = false;
-	
 	
 	// ------------------------ File System Bahavior ------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
index 2698992..2d63fa4 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
@@ -103,6 +103,11 @@ public class MemorySegment {
 	public final boolean isFreed() {
 		return this.memory == null;
 	}
+
+	public final void free() {
+		this.wrapper = null;
+		this.memory = null;
+	}
 	
 	/**
 	 * Gets the size of the memory segment, in bytes. Because segments

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
index 73671fa..5b06547 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
@@ -70,7 +70,7 @@ public final class Record implements Value, CopyableValue<Record> {
 	
 	private Value[] writeFields;		// the cache for objects into which the binary representations are read
 	
-	private int binaryLen;				// the length of the contents in the binary buffer that is valid
+	public int binaryLen;				// the length of the contents in the binary buffer that is valid
 	
 	private int numFields;				// the number of fields in the record
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
new file mode 100644
index 0000000..476e22a
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
@@ -0,0 +1,177 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * A statistically unique identification number.
+ */
+public class AbstractID implements IOReadableWritable {
+
+	/** The size of a long in bytes */
+	private static final int SIZE_OF_LONG = 8;
+
+	/** The size of the ID in byte */
+	public static final int SIZE = 2 * SIZE_OF_LONG;
+
+	/** The upper part of the actual ID */
+	private long upperPart;
+
+	/** The lower part of the actual ID */
+	private long lowerPart;
+
+	/**
+	 * Constructs a new ID with a specific bytes value.
+	 */
+	public AbstractID(byte[] bytes) {
+		if (bytes.length != SIZE) {
+			throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
+		}
+
+		this.lowerPart = byteArrayToLong(bytes, 0);
+		this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
+	}
+
+	/**
+	 * Constructs a new abstract ID.
+	 *
+	 * @param lowerPart the lower bytes of the ID
+	 * @param upperPart the higher bytes of the ID
+	 */
+	public AbstractID(long lowerPart, long upperPart) {
+		this.lowerPart = lowerPart;
+		this.upperPart = upperPart;
+	}
+
+	/**
+	 * Creates a new abstract ID from the given one.
+	 * <p>
+	 * The given and the newly created abstract ID will be identical, i.e. a comparison by <code>equals</code> will
+	 * return <code>true</code> and both objects will have the same hash code.
+	 *
+	 * @param id the abstract ID to copy
+	 */
+	public AbstractID(AbstractID id) {
+		this.lowerPart = id.lowerPart;
+		this.upperPart = id.upperPart;
+	}
+
+	/**
+	 * Constructs a new random ID from a uniform distribution.
+	 */
+	public AbstractID() {
+		this.lowerPart = generateRandomLong();
+		this.upperPart = generateRandomLong();
+	}
+
+	/**
+	 * Generates a uniformly distributed random positive long.
+	 *
+	 * @return a uniformly distributed random positive long
+	 */
+	protected static long generateRandomLong() {
+		return (long) (Math.random() * Long.MAX_VALUE);
+	}
+
+	/**
+	 * Converts the given byte array to a long.
+	 *
+	 * @param ba the byte array to be converted
+	 * @param offset the offset indicating at which byte inside the array the conversion shall begin
+	 * @return the long variable
+	 */
+	private static long byteArrayToLong(byte[] ba, int offset) {
+		long l = 0;
+
+		for (int i = 0; i < SIZE_OF_LONG; ++i) {
+			l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
+		}
+
+		return l;
+	}
+
+	/**
+	 * Converts a long to a byte array.
+	 *
+	 * @param l the long variable to be converted
+	 * @param ba the byte array to store the result the of the conversion
+	 * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
+	 */
+	private static void longToByteArray(final long l, final byte[] ba, final int offset) {
+		for (int i = 0; i < SIZE_OF_LONG; ++i) {
+			final int shift = i << 3; // i * 8
+			ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
+		}
+	}
+
+	/**
+	 * Sets an ID from another ID by copying its internal byte representation.
+	 *
+	 * @param src source ID
+	 */
+	public void setID(AbstractID src) {
+		this.lowerPart = src.lowerPart;
+		this.upperPart = src.upperPart;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof AbstractID) {
+			AbstractID src = (AbstractID) obj;
+			return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return ((int)  this.lowerPart) ^
+				((int) (this.lowerPart >>> 32)) ^
+				((int)  this.upperPart) ^
+				((int) (this.upperPart >>> 32));
+	}
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		this.lowerPart = in.readLong();
+		this.upperPart = in.readLong();
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeLong(this.lowerPart);
+		out.writeLong(this.upperPart);
+	}
+
+	public void write(ByteBuffer buffer) {
+		buffer.putLong(this.lowerPart);
+		buffer.putLong(this.upperPart);
+	}
+
+	@Override
+	public String toString() {
+		final byte[] ba = new byte[SIZE];
+		longToByteArray(this.lowerPart, ba, 0);
+		longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
+		return StringUtils.byteToHexString(ba);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
deleted file mode 100644
index 012dc02..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to force a checkpoint decision.
- * 
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface ForceCheckpoint {
-
-	boolean checkpoint();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
deleted file mode 100644
index 433478b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * This annotation can be used to indicate a stateful task.
- * 
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface Stateful {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
deleted file mode 100644
index b3d5e38..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * This annotation can be used to indicate a stateless task.
- * 
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface Stateless {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
deleted file mode 100644
index 317b112..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.CONSTRUCTOR)
-public @interface TaskAnnotation {
-
-	boolean statefull();
-	double selektivity() default 1;
-	
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
index c39aad0..4068e5b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
@@ -18,7 +18,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 
 /**
  * A channel deployment descriptor contains all the information necessary to deploy either an input or an output channel

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
index 8a78154..02d6578 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
@@ -21,8 +21,8 @@ import java.util.Iterator;
 import java.util.List;
 
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.util.EnumUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
deleted file mode 100644
index 4e6b2e4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/*
- *  Copyright 2010 casp.
- * 
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- * 
- *       http://www.apache.org/licenses/LICENSE-2.0
- * 
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  under the License.
- */
-package eu.stratosphere.nephele.example.events;
-
-import java.io.IOException;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.client.JobSubmissionResult;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-
-/**
- */
-public class EventExample {
-
-	public static void main(String[] args) {
-
-		JobGraph jobGraph = new JobGraph("Grep Example Job");
-
-		JobFileInputVertex input = new JobFileInputVertex("Input 1", jobGraph);
-		input.setFileInputClass(FileLineReader.class);
-		input.setFilePath(new Path("file:///Users/casp/test2.txt"));
-
-		JobTaskVertex task1 = new JobTaskVertex("Task 1", jobGraph);
-		task1.setTaskClass(EventSender.class);
-
-		JobTaskVertex task2 = new JobTaskVertex("Task 2", jobGraph);
-		task2.setTaskClass(EventReceiver.class);
-
-		JobFileOutputVertex output = new JobFileOutputVertex("Output 1", jobGraph);
-		output.setFileOutputClass(FileLineWriter.class);
-		output.setFilePath(new Path("file:///Users/casp/output.txt"));
-
-		jobGraph.addJar(new Path("file:///Users/casp/EventTask.jar"));
-		jobGraph.addJar(new Path("file:///Users/casp/StringTaskEvent.jar"));
-		try {
-
-			input.connectTo(task1, ChannelType.INMEMORY);
-			task1.connectTo(task2, ChannelType.INMEMORY);
-			task2.connectTo(output, ChannelType.INMEMORY);
-
-		} catch (JobGraphDefinitionException e) {
-			e.printStackTrace();
-		}
-
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
-		conf.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, "6023");
-
-		try {
-			JobClient jobClient = new JobClient(jobGraph, conf);
-			JobSubmissionResult result = jobClient.submitJob();
-			System.out.println(result.getDescription());
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
deleted file mode 100644
index 3e5d322..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.events;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- */
-public class EventReceiver extends AbstractTask {
-
-	// this is just a dummy input gate...
-	private RecordReader<StringRecord> input = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
-		this.input.subscribeToEvent(new MyEventListener(), StringTaskEvent.class);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		while (this.input.hasNext()) {
-
-			StringRecord s = input.next();
-			this.output.emit(s);
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
deleted file mode 100644
index 4961dac..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.events;
-
-/*
- *  Copyright 2010 casp.
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  under the License.
- */
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- */
-public class EventSender extends AbstractTask {
-
-	// this is just a dummy output gate...
-	private RecordReader<StringRecord> input = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		int i = 1;
-		while (this.input.hasNext()) {
-			i++;
-			if (i % 1000 == 0) {
-				this.output.publishEvent(new StringTaskEvent("this is the " + i + "th message"));
-			}
-			StringRecord s = input.next();
-			this.output.emit(s);
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
deleted file mode 100644
index f285d44..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/*
- *  Copyright 2010 casp.
- * 
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- * 
- *       http://www.apache.org/licenses/LICENSE-2.0
- * 
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  under the License.
- */
-package eu.stratosphere.nephele.example.events;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-
-/**
- */
-public class MyEventListener implements EventListener {
-
-	@Override
-	public void eventOccurred(AbstractTaskEvent event) {
-		if (event instanceof StringTaskEvent) {
-			System.out.println("Message Event received: " + ((StringTaskEvent) event).getString());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
deleted file mode 100644
index 9afef9b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.grep;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-public class Grep {
-
-	public static void main(String[] args) {
-
-		JobGraph jobGraph = new JobGraph("Grep Example Job");
-
-		JobFileInputVertex input = new JobFileInputVertex("Input 1", jobGraph);
-		input.setFileInputClass(FileLineReader.class);
-		input.setFilePath(new Path("file:///home/ec2-user/test.txt"));
-		input.setInstanceType("t1.micro");
-		
-		JobTaskVertex task1 = new JobTaskVertex("Task 1", jobGraph);
-		task1.setTaskClass(GrepTask.class);
-		task1.setInstanceType("t1.micro");
-
-		
-		JobFileOutputVertex output = new JobFileOutputVertex("Output 1", jobGraph);
-		output.setFileOutputClass(FileLineWriter.class);
-		output.setFilePath(new Path("file:///tmp/"));
-		output.setInstanceType("t1.micro");
-
-		try {
-
-			input.connectTo(task1, ChannelType.INMEMORY);
-			task1.connectTo(output, ChannelType.INMEMORY);
-
-		} catch (JobGraphDefinitionException e) {
-			e.printStackTrace();
-		}
-
-		// Create jar file and attach it
-		final File jarFile = new File("/tmp/grepJob.jar");
-		final JarFileCreator jarFileCreator = new JarFileCreator(jarFile);
-		jarFileCreator.addClass(GrepTask.class);
-
-		try {
-			jarFileCreator.createJarFile();
-			System.out.println("done creating!!");
-		} catch (IOException ioe) {
-
-			if (jarFile.exists()) {
-				jarFile.delete();
-			}
-
-			System.out.println("ERROR creating jar");
-			return;
-		}
-
-		jobGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
-		// Submit job
-		Configuration conf = new Configuration();
-
-		jobGraph.getJobConfiguration().setString("job.cloud.awsaccessid", "xxx");
-		jobGraph.getJobConfiguration().setString("job.cloud.awssecretkey", "xxx");
-		jobGraph.getJobConfiguration().setString("job.cloud.sshkeypair", "caspeu");
-		jobGraph.getJobConfiguration().setString("job.ec2.image.id", "ami-d64474a2");		
-
-		InetSocketAddress jobmanager = new InetSocketAddress("127.0.0.1", 6123);
-		
-		
-		try {
-			final JobClient jobClient = new JobClient(jobGraph, conf, jobmanager);
-			System.out.println("submitting");
-			jobClient.submitJobAndWait();
-			System.out.println("done.");
-		} catch (Exception e) {
-			System.out.println(e);
-		}
-
-		if (jarFile.exists()) {
-			jarFile.delete();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
deleted file mode 100644
index ae72f1c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.grep;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class GrepTask extends AbstractTask {
-
-	private RecordReader<StringRecord> input = null;
-
-	private RecordWriter<StringRecord> output = null;
-
-	private int i = 0;
-
-	@Override
-	public void invoke() throws Exception {
-
-		while (this.input.hasNext()) {
-
-			StringRecord s = input.next();
-			this.output.emit(s);
-			i++;
-		}
-
-		System.out.println("GREP: Emmited all " + i + " records");
-	}
-
-	@Override
-	public void registerInputOutput() {
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
deleted file mode 100644
index f85024c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.api.common.JobExecutionResult;
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGenericOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-/**
- * This class implements a speed test for Nephele. It's primary purpose is to benchmark the performance of Nephele's
- * network channels with different degrees of parallelism.
- * 
- */
-public final class SpeedTest {
-
-	/**
-	 * Configuration key to specify the amount of data to be send in GB.
-	 */
-	static final String DATA_VOLUME_CONFIG_KEY = "data.volume";
-
-	/**
-	 * Entry point to the application.
-	 * 
-	 * @param args
-	 *        the provided arguments
-	 */
-	public static void main(final String[] args) {
-
-		// Parse the arguments first
-		if (args.length < 4) {
-			System.err
-				.println("Insufficient number of arguments. Please provide <job manager address> <amount of data to send in GB> <number of subtasks> <number of subtasks per task> (<use forwarder>)");
-			System.exit(1);
-			return;
-		}
-
-		// Determine the job manager address
-		final String jobManagerAddress = args[0];
-
-		// Determine amount of data to send in GB
-		int amountOfDataToSend = 0;
-		try {
-			amountOfDataToSend = Integer.parseInt(args[1]);
-		} catch (NumberFormatException e) {
-			System.err.println("Cannot parse amount of data to send. Please provide a positive integer value.");
-			System.exit(1);
-			return;
-		}
-
-		if (amountOfDataToSend <= 0 || amountOfDataToSend > 1024) {
-			System.err
-				.println("Please provide an integer value between 1 and 1024 indicating the amount of data to send in GB.");
-			System.exit(1);
-			return;
-		}
-
-		// Determine the number of subtasks
-		int numberOfSubtasks = 0;
-		try {
-			numberOfSubtasks = Integer.parseInt(args[2]);
-		} catch (NumberFormatException e) {
-			System.err.println("Cannot parse the number of subtasks. Please provide a positive integer value.");
-			System.exit(1);
-			return;
-		}
-
-		if (numberOfSubtasks <= 0) {
-			System.err.println("Please provide a positive integer value indicating the number of subtasks.");
-			System.exit(1);
-			return;
-		}
-
-		// Determine the number of subtasks per instance
-		int numberOfSubtasksPerInstance = 0;
-		try {
-			numberOfSubtasksPerInstance = Integer.parseInt(args[3]);
-		} catch (NumberFormatException e) {
-			System.err
-				.println("Cannot parse the number of subtasks per instance. Please provide a positive integer value.");
-			System.exit(1);
-			return;
-		}
-
-		if (numberOfSubtasksPerInstance <= 0) {
-			System.err
-				.println("Please provide a positive integer value indicating the number of subtasks per instance.");
-			System.exit(1);
-			return;
-		}
-
-		// Determine whether to use a forwarder or not
-		boolean useForwarder = false;
-		if (args.length >= 5) {
-			useForwarder = Boolean.parseBoolean(args[4]);
-		}
-
-		final JobGraph jobGraph = new JobGraph("Nephele Speed Test");
-
-		final JobGenericInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
-		producer.setInputClass(SpeedTestProducer.class);
-		producer.setNumberOfSubtasks(numberOfSubtasks);
-		producer.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
-		producer.getConfiguration().setInteger(DATA_VOLUME_CONFIG_KEY, amountOfDataToSend);
-
-		JobTaskVertex forwarder = null;
-		if (useForwarder) {
-			forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
-			forwarder.setTaskClass(SpeedTestForwarder.class);
-			forwarder.setNumberOfSubtasks(numberOfSubtasks);
-			forwarder.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
-		}
-
-		final JobGenericOutputVertex consumer = new JobGenericOutputVertex("Speed Test Consumer", jobGraph);
-		consumer.setOutputClass(SpeedTestConsumer.class);
-		consumer.setNumberOfSubtasks(numberOfSubtasks);
-		consumer.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
-
-		// Set vertex sharing
-		producer.setVertexToShareInstancesWith(consumer);
-		if (forwarder != null) {
-			forwarder.setVertexToShareInstancesWith(consumer);
-		}
-
-		// Connect the vertices
-		try {
-			if (forwarder == null) {
-				producer.connectTo(consumer, ChannelType.NETWORK,
-					DistributionPattern.BIPARTITE);
-			} else {
-				producer.connectTo(forwarder, ChannelType.NETWORK,
-					DistributionPattern.BIPARTITE);
-				forwarder.connectTo(consumer, ChannelType.NETWORK,
-					DistributionPattern.BIPARTITE);
-			}
-
-		} catch (JobGraphDefinitionException e) {
-			e.printStackTrace();
-			System.exit(1);
-			return;
-		}
-
-		File jarFile = null;
-
-		try {
-
-			// Create jar file of job
-			jarFile = File.createTempFile("speedtest", "jar");
-			jarFile.deleteOnExit();
-
-			final JarFileCreator jfc = new JarFileCreator(jarFile);
-			jfc.addClass(SpeedTest.class);
-			jfc.addClass(SpeedTestProducer.class);
-			jfc.addClass(SpeedTestForwarder.class);
-			jfc.addClass(SpeedTestConsumer.class);
-			jfc.addClass(SpeedTestRecord.class);
-			jfc.createJarFile();
-
-			jobGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
-			final Configuration clientConfiguration = new Configuration();
-			clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress);
-			clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
-			final JobClient jobClient = new JobClient(jobGraph, clientConfiguration);
-			final JobExecutionResult jobResult = jobClient.submitJobAndWait();
-			final long executionTime = jobResult.getNetRuntime();
-
-			// Calculate throughput in MBit/s and output it
-			System.out.print("Job finished with a throughput of " + toMBitPerSecond(amountOfDataToSend, executionTime));
-
-		} catch (IOException ioe) {
-			ioe.printStackTrace();
-			System.exit(1);
-			return;
-		} catch (JobExecutionException jee) {
-			jee.printStackTrace();
-			System.exit(1);
-			return;
-		}
-	}
-
-	/**
-	 * Calculates the throughput in MBit/s from the given amount of data that has been sent and the execution time.
-	 * 
-	 * @param amountOfDataToSend
-	 *        the amount of data that has been sent in GB
-	 * @param executionTime
-	 *        the execution time in milliseconds
-	 * @return the resulting throughput in MBit/s
-	 */
-	private static int toMBitPerSecond(final int amountOfDataToSend, final long executionTime) {
-
-		final double dataVolumeInMBit = amountOfDataToSend * 8192.0;
-		final double executionTimeInSeconds = executionTime / 1000.0;
-
-		return (int) Math.round(dataVolumeInMBit / executionTimeInSeconds);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
deleted file mode 100644
index eb40196..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-
-/**
- * This class implements the consumer task of the speed test. The consumer task simply drops all received records.
- * 
- */
-public class SpeedTestConsumer extends AbstractOutputTask {
-
-	/**
-	 * The record reader used to read the incoming records.
-	 */
-	private MutableRecordReader<SpeedTestRecord> input;
-
-
-	@Override
-	public void registerInputOutput() {
-
-		this.input = new MutableRecordReader<SpeedTestRecord>(this);
-	}
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		final SpeedTestRecord record = new SpeedTestRecord();
-		while (this.input.next(record)) {
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
deleted file mode 100644
index 91090a8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * This class implements a forwarder task for the speed test. The forwarder task immediately outputs every record it
- * reads.
- * 
- */
-public final class SpeedTestForwarder extends AbstractTask {
-
-	/**
-	 * The record reader used to read incoming records.
-	 */
-	private MutableRecordReader<SpeedTestRecord> input;
-
-	/**
-	 * The record writer used to forward incoming records.
-	 */
-	private RecordWriter<SpeedTestRecord> output;
-
-
-	@Override
-	public void registerInputOutput() {
-
-		this.input = new MutableRecordReader<SpeedTestRecord>(this);
-		this.output = new RecordWriter<SpeedTestRecord>(this, SpeedTestRecord.class);
-	}
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		final SpeedTestRecord record = new SpeedTestRecord();
-		while (this.input.next(record)) {
-			this.output.emit(record);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
deleted file mode 100644
index 78ea23d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-
-/**
- * This class implements the producer task which produces test records for the speed test.
- * 
- */
-public final class SpeedTestProducer extends AbstractGenericInputTask {
-
-	/**
-	 * The record writer to emit the produced records.
-	 */
-	private RecordWriter<SpeedTestRecord> writer;
-
-
-	@Override
-	public void registerInputOutput() {
-
-		this.writer = new RecordWriter<SpeedTestRecord>(this, SpeedTestRecord.class);
-	}
-
-
-	@Override
-	public void invoke() throws Exception {
-
-		// Determine the amount of data to send per subtask
-		final int dataVolumePerSubtaskInMB = getTaskConfiguration().getInteger(SpeedTest.DATA_VOLUME_CONFIG_KEY, 1)
-			* 1024 / getCurrentNumberOfSubtasks();
-
-		final long numberOfRecordsToEmit = dataVolumePerSubtaskInMB * 1024 * 1024 / SpeedTestRecord.RECORD_SIZE;
-
-		final SpeedTestRecord record = new SpeedTestRecord();
-
-		for (long i = 0; i < numberOfRecordsToEmit; ++i) {
-			this.writer.emit(record);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
deleted file mode 100644
index b3522f3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This class implements the record type used for the speed test.
- */
-public final class SpeedTestRecord implements IOReadableWritable {
-
-	/**
-	 * The size of a single record in bytes.
-	 */
-	static final int RECORD_SIZE = 128;
-
-	/**
-	 * The byte buffer which actually stored the record's data.
-	 */
-	private final byte[] buf = new byte[RECORD_SIZE];
-
-	/**
-	 * Constructs a new record and initializes it.
-	 */
-	public SpeedTestRecord() {
-		for (int i = 0; i < RECORD_SIZE; ++i) {
-			this.buf[i] = (byte) (i % 128);
-		}
-	}
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-		out.write(this.buf);
-	}
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-		in.readFully(this.buf);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
deleted file mode 100644
index 209a81e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-public class ConsumerTask extends AbstractFileOutputTask {
-
-	private RecordReader<StringRecord> input;
-
-	@Override
-	public void registerInputOutput() {
-
-		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		int count = 0;
-
-		while (this.input.hasNext()) {
-			this.input.next();
-			++count;
-		}
-
-		System.out.println("Consumer receiver " + count + " records in total");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
deleted file mode 100644
index 647a07b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-
-public class ProducerTask extends AbstractFileInputTask {
-
-	private static final int NUMBER_OF_RECORDS_TO_PRODUCE = 1000000;
-
-	private RecordWriter<StringRecord> output;
-
-	@Override
-	public void registerInputOutput() {
-
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-
-		for (int i = 0; i < NUMBER_OF_RECORDS_TO_PRODUCE; ++i) {
-
-			final StringRecord record = new StringRecord("Record " + i + " of " + this);
-			this.output.emit(record);
-		}
-
-	}
-
-}