You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:31:00 UTC
[25/30] git commit: Offer buffer-oriented API for I/O (#25)
Offer buffer-oriented API for I/O (#25)
https://github.com/stratosphere/stratosphere/issues/25
The runtime offered a record-oriented API for data transfers, which
* resulted in unnecessary data (de)serialization,
* complicated the upcoming fault tolerance implementation, and
* blocked more efficient implementations of higher-level operators.
With this commit, the runtime offers a buffer-oriented API for the
output side (sending), which is oblivious to records. The buffer
oriented input side (receiving) is still to be implemented.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2db78a8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2db78a8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2db78a8d
Branch: refs/heads/master
Commit: 2db78a8dc1a4664f3e384005d7e07bea594b835b
Parents: 0bb812e
Author: uce <u....@fu-berlin.de>
Authored: Tue Jan 14 17:45:21 2014 +0100
Committer: StephanEwen <st...@tu-berlin.de>
Committed: Sat Jun 7 09:41:21 2014 +0200
----------------------------------------------------------------------
.../spargel/java/record/SpargelIteration.java | 3 +-
.../plantranslate/NepheleJobGraphGenerator.java | 13 +-
.../configuration/ConfigConstants.java | 6 -
.../stratosphere/core/memory/MemorySegment.java | 5 +
.../main/java/eu/stratosphere/types/Record.java | 2 +-
.../eu/stratosphere/nephele/AbstractID.java | 177 ++++
.../nephele/annotations/ForceCheckpoint.java | 30 -
.../nephele/annotations/Stateful.java | 28 -
.../nephele/annotations/Stateless.java | 28 -
.../nephele/annotations/TaskAnnotation.java | 29 -
.../deployment/ChannelDeploymentDescriptor.java | 2 +-
.../deployment/GateDeploymentDescriptor.java | 4 +-
.../nephele/example/events/EventExample.java | 95 ---
.../nephele/example/events/EventReceiver.java | 49 --
.../nephele/example/events/EventSender.java | 68 --
.../nephele/example/events/MyEventListener.java | 46 --
.../stratosphere/nephele/example/grep/Grep.java | 108 ---
.../nephele/example/grep/GrepTask.java | 47 --
.../nephele/example/speedtest/SpeedTest.java | 224 -----
.../example/speedtest/SpeedTestConsumer.java | 45 -
.../example/speedtest/SpeedTestForwarder.java | 54 --
.../example/speedtest/SpeedTestProducer.java | 54 --
.../example/speedtest/SpeedTestRecord.java | 55 --
.../nephele/example/union/ConsumerTask.java | 44 -
.../nephele/example/union/ProducerTask.java | 43 -
.../nephele/example/union/UnionJob.java | 104 ---
.../nephele/example/union/UnionTask.java | 45 -
.../nephele/execution/CancelTaskException.java | 25 +
.../nephele/execution/Environment.java | 82 +-
.../execution/ExecutionStateTransition.java | 24 +-
.../nephele/execution/RuntimeEnvironment.java | 326 +++-----
.../DistributionPatternProvider.java | 2 +-
.../nephele/executiongraph/ExecutionEdge.java | 11 +-
.../nephele/executiongraph/ExecutionGate.java | 4 +-
.../nephele/executiongraph/ExecutionGraph.java | 20 +-
.../executiongraph/ExecutionGroupEdge.java | 28 +-
.../executiongraph/ExecutionGroupVertex.java | 27 +-
.../nephele/executiongraph/ExecutionStage.java | 6 +-
.../nephele/executiongraph/ExecutionVertex.java | 38 +-
.../executiongraph/ExecutionVertexID.java | 2 +-
.../executiongraph/ManagementGraphFactory.java | 2 +-
.../nephele/instance/AbstractInstance.java | 23 +-
.../nephele/instance/AllocationID.java | 2 +-
.../instance/InstanceConnectionInfo.java | 25 +-
.../nephele/instance/InstanceID.java | 2 +-
.../instance/cluster/ClusterManager.java | 10 +-
.../stratosphere/nephele/io/AbstractGate.java | 148 ----
.../eu/stratosphere/nephele/io/AbstractID.java | 214 -----
.../nephele/io/AbstractRecordReader.java | 98 ---
.../nephele/io/AbstractRecordWriter.java | 140 ----
.../io/AbstractSingleGateRecordReader.java | 77 --
.../nephele/io/AbstractUnionRecordReader.java | 152 ----
.../nephele/io/BroadcastRecordWriter.java | 53 --
.../nephele/io/ChannelSelector.java | 39 -
.../nephele/io/DataOutputBuffer.java | 165 ----
.../nephele/io/DefaultChannelSelector.java | 47 --
.../nephele/io/DistributionPattern.java | 33 -
.../java/eu/stratosphere/nephele/io/Gate.java | 126 ---
.../java/eu/stratosphere/nephele/io/GateID.java | 22 -
.../io/ImmutableRecordDeserializerFactory.java | 41 -
.../nephele/io/InputChannelResult.java | 23 -
.../eu/stratosphere/nephele/io/InputGate.java | 136 ----
.../stratosphere/nephele/io/MutableReader.java | 32 -
.../io/MutableRecordDeserializerFactory.java | 52 --
.../nephele/io/MutableRecordReader.java | 119 ---
.../nephele/io/MutableUnionRecordReader.java | 37 -
.../eu/stratosphere/nephele/io/OutputGate.java | 149 ----
.../java/eu/stratosphere/nephele/io/Reader.java | 30 -
.../eu/stratosphere/nephele/io/ReaderBase.java | 67 --
.../nephele/io/RecordAvailabilityListener.java | 35 -
.../nephele/io/RecordDeserializer.java | 55 --
.../nephele/io/RecordDeserializerFactory.java | 31 -
.../stratosphere/nephele/io/RecordReader.java | 153 ----
.../stratosphere/nephele/io/RecordWriter.java | 82 --
.../nephele/io/RuntimeInputGate.java | 330 --------
.../nephele/io/RuntimeOutputGate.java | 333 --------
.../nephele/io/UnionRecordReader.java | 67 --
.../java/eu/stratosphere/nephele/io/Writer.java | 28 -
.../nephele/io/channels/AbstractChannel.java | 127 ---
.../io/channels/AbstractInputChannel.java | 102 ---
.../io/channels/AbstractOutputChannel.java | 111 ---
.../nephele/io/channels/Buffer.java | 175 ----
.../nephele/io/channels/BufferFactory.java | 32 -
.../nephele/io/channels/ChannelID.java | 30 -
.../nephele/io/channels/ChannelType.java | 31 -
.../io/channels/ChannelWithAccessInfo.java | 58 --
.../io/channels/ChannelWithPosition.java | 42 -
.../io/channels/DefaultDeserializer.java | 781 ------------------
.../DistributedChannelWithAccessInfo.java | 176 ----
.../io/channels/LocalChannelWithAccessInfo.java | 170 ----
.../nephele/io/channels/MemoryBuffer.java | 249 ------
.../io/channels/MemoryBufferPoolConnector.java | 32 -
.../io/channels/MemoryBufferRecycler.java | 91 ---
.../io/channels/SerializationBuffer.java | 141 ----
.../AbstractByteBufferedInputChannel.java | 243 ------
.../AbstractByteBufferedOutputChannel.java | 255 ------
.../io/channels/bytebuffered/BufferOrEvent.java | 52 --
.../ByteBufferedChannelCloseEvent.java | 35 -
.../ByteBufferedInputChannelBroker.java | 36 -
.../ByteBufferedOutputChannelBroker.java | 71 --
.../bytebuffered/EndOfSuperstepEvent.java | 34 -
.../bytebuffered/InMemoryInputChannel.java | 34 -
.../bytebuffered/InMemoryOutputChannel.java | 34 -
.../bytebuffered/NetworkInputChannel.java | 34 -
.../bytebuffered/NetworkOutputChannel.java | 34 -
.../nephele/io/library/DirectoryReader.java | 103 ---
.../nephele/io/library/DirectoryWriter.java | 67 --
.../nephele/io/library/FileLineReader.java | 76 --
.../nephele/io/library/FileLineWriter.java | 75 --
.../eu/stratosphere/nephele/ipc/Client.java | 43 +-
.../nephele/jobgraph/AbstractJobVertex.java | 3 +-
.../nephele/jobgraph/DistributionPattern.java | 33 +
.../stratosphere/nephele/jobgraph/JobEdge.java | 3 +-
.../eu/stratosphere/nephele/jobgraph/JobID.java | 69 +-
.../nephele/jobgraph/JobVertexID.java | 2 +-
.../nephele/jobmanager/JobManager.java | 147 +---
.../jobmanager/scheduler/AbstractScheduler.java | 2 +-
.../jobmanager/scheduler/RecoveryLogic.java | 3 +-
.../nephele/managementgraph/ManagementEdge.java | 2 +-
.../managementgraph/ManagementEdgeID.java | 4 +-
.../managementgraph/ManagementGateID.java | 2 +-
.../managementgraph/ManagementGraph.java | 2 +-
.../managementgraph/ManagementGroupEdge.java | 2 +-
.../managementgraph/ManagementGroupVertex.java | 2 +-
.../ManagementGroupVertexID.java | 2 +-
.../managementgraph/ManagementVertexID.java | 2 +-
.../nephele/multicast/MulticastCluster.java | 210 -----
.../multicast/MulticastForwardingTable.java | 49 --
.../nephele/multicast/MulticastManager.java | 463 -----------
.../multicast/TopologyInformationSupplier.java | 30 -
.../nephele/multicast/TreeNode.java | 246 ------
.../nephele/profiling/TaskManagerProfiler.java | 5 +-
.../profiling/impl/TaskManagerProfilerImpl.java | 31 +-
.../protocols/ChannelLookupProtocol.java | 4 +-
.../protocols/ExtendedManagementProtocol.java | 13 -
.../protocols/TaskOperationProtocol.java | 14 +-
.../services/iomanager/BlockChannelAccess.java | 4 +-
.../services/memorymanager/MemoryManager.java | 3 +-
.../memorymanager/spi/DefaultMemoryManager.java | 17 +-
.../taskmanager/ExecutorThreadFactory.java | 35 +
.../stratosphere/nephele/taskmanager/Task.java | 257 +++++-
.../nephele/taskmanager/TaskKillResult.java | 44 -
.../nephele/taskmanager/TaskManager.java | 56 +-
.../BufferAvailabilityListener.java | 28 -
.../bufferprovider/BufferProvider.java | 82 --
.../bufferprovider/BufferProviderBroker.java | 24 -
.../bufferprovider/GlobalBufferPool.java | 135 ---
.../bufferprovider/LocalBufferPool.java | 287 -------
.../bufferprovider/LocalBufferPoolOwner.java | 54 --
.../AbstractOutputChannelContext.java | 84 --
.../AbstractOutputChannelForwarder.java | 109 ---
.../ByteBufferedChannelManager.java | 816 -------------------
.../bytebuffered/CanceledChannelSet.java | 211 -----
.../bytebuffered/ChannelContext.java | 36 -
.../ConnectionInfoLookupResponse.java | 176 ----
.../taskmanager/bytebuffered/GateContext.java | 21 -
.../bytebuffered/IncomingConnection.java | 125 ---
.../bytebuffered/IncomingConnectionThread.java | 223 -----
.../bytebuffered/InputChannelContext.java | 21 -
.../bytebuffered/InputGateContext.java | 24 -
.../InsufficientResourcesException.java | 37 -
.../bytebuffered/NetworkConnectionManager.java | 173 ----
.../bytebuffered/OutgoingConnection.java | 531 ------------
.../bytebuffered/OutgoingConnectionThread.java | 270 ------
.../bytebuffered/OutputChannelContext.java | 17 -
.../OutputChannelForwardingChain.java | 84 --
.../bytebuffered/OutputGateContext.java | 22 -
.../bytebuffered/ReceiverNotFoundEvent.java | 169 ----
.../bytebuffered/RemoteReceiver.java | 157 ----
.../bytebuffered/SenderHintEvent.java | 119 ---
.../taskmanager/bytebuffered/TaskContext.java | 24 -
.../bytebuffered/UnexpectedEnvelopeEvent.java | 81 --
.../runtime/ExecutorThreadFactory.java | 35 -
.../taskmanager/runtime/ForwardingBarrier.java | 75 --
.../taskmanager/runtime/RuntimeDispatcher.java | 38 -
.../runtime/RuntimeInputChannelContext.java | 303 -------
.../runtime/RuntimeInputGateContext.java | 183 -----
.../runtime/RuntimeOutputChannelBroker.java | 206 -----
.../runtime/RuntimeOutputChannelContext.java | 76 --
.../runtime/RuntimeOutputGateContext.java | 159 ----
.../taskmanager/runtime/RuntimeTask.java | 346 --------
.../taskmanager/runtime/RuntimeTaskContext.java | 211 -----
.../transferenvelope/AbstractDeserializer.java | 355 --------
.../transferenvelope/AbstractSerializer.java | 274 -------
.../CapacityConstrainedArrayQueue.java | 322 --------
.../transferenvelope/DefaultDeserializer.java | 94 ---
.../transferenvelope/DefaultSerializer.java | 38 -
.../NoBufferAvailableException.java | 53 --
.../transferenvelope/TransferEnvelope.java | 165 ----
.../TransferEnvelopeDispatcher.java | 37 -
.../TransferEnvelopeReceiverList.java | 86 --
.../nephele/util/BufferPoolConnector.java | 45 +
.../eu/stratosphere/nephele/util/TaskUtils.java | 42 -
.../runtime/iterative/io/FakeOutputTask.java | 2 +-
.../iterative/task/IterationHeadPactTask.java | 16 +-
.../task/IterationIntermediatePactTask.java | 7 +-
.../task/IterationSynchronizationSinkTask.java | 2 +-
.../iterative/task/IterationTailPactTask.java | 1 +
.../pact/runtime/shipping/OutputCollector.java | 31 +-
.../pact/runtime/shipping/OutputEmitter.java | 2 +-
.../runtime/shipping/RecordOutputCollector.java | 48 +-
.../runtime/shipping/RecordOutputEmitter.java | 2 +
.../pact/runtime/task/DataSinkTask.java | 19 +-
.../pact/runtime/task/DataSourceTask.java | 24 +-
.../pact/runtime/task/RegularPactTask.java | 250 +++---
.../ExceptionInChainedStubException.java | 8 +
.../SynchronousChainedCombineDriver.java | 2 +
.../pact/runtime/task/util/ReaderIterator.java | 2 +-
.../runtime/task/util/RecordReaderIterator.java | 2 +-
.../java/eu/stratosphere/runtime/io/Buffer.java | 93 +++
.../stratosphere/runtime/io/BufferRecycler.java | 26 +
.../runtime/io/api/AbstractRecordReader.java | 98 +++
.../io/api/AbstractSingleGateRecordReader.java | 69 ++
.../io/api/AbstractUnionRecordReader.java | 155 ++++
.../runtime/io/api/BufferWriter.java | 67 ++
.../runtime/io/api/ChannelSelector.java | 39 +
.../runtime/io/api/MutableReader.java | 32 +
.../runtime/io/api/MutableRecordReader.java | 120 +++
.../io/api/MutableUnionRecordReader.java | 37 +
.../eu/stratosphere/runtime/io/api/Reader.java | 30 +
.../stratosphere/runtime/io/api/ReaderBase.java | 67 ++
.../runtime/io/api/RecordReader.java | 154 ++++
.../runtime/io/api/RecordWriter.java | 151 ++++
.../io/api/RoundRobinChannelSelector.java | 47 ++
.../runtime/io/api/UnionRecordReader.java | 67 ++
.../runtime/io/channels/BufferOrEvent.java | 52 ++
.../runtime/io/channels/Channel.java | 97 +++
.../runtime/io/channels/ChannelCloseEvent.java | 33 +
.../runtime/io/channels/ChannelID.java | 39 +
.../runtime/io/channels/ChannelType.java | 26 +
.../io/channels/EndOfSuperstepEvent.java | 34 +
.../runtime/io/channels/InputChannel.java | 493 +++++++++++
.../runtime/io/channels/OutputChannel.java | 193 +++++
.../ReceiverAlreadyClosedException.java | 22 +
.../eu/stratosphere/runtime/io/gates/Gate.java | 174 ++++
.../stratosphere/runtime/io/gates/GateID.java | 24 +
.../runtime/io/gates/InputChannelResult.java | 23 +
.../runtime/io/gates/InputGate.java | 384 +++++++++
.../runtime/io/gates/OutputGate.java | 165 ++++
.../io/gates/RecordAvailabilityListener.java | 36 +
.../runtime/io/network/ChannelManager.java | 646 +++++++++++++++
.../network/ConnectionInfoLookupResponse.java | 143 ++++
.../network/InsufficientResourcesException.java | 37 +
.../LocalReceiverCancelledException.java | 37 +
.../io/network/NetworkConnectionManager.java | 176 ++++
.../runtime/io/network/RemoteReceiver.java | 157 ++++
.../runtime/io/network/SenderHintEvent.java | 117 +++
.../BufferAvailabilityListener.java | 28 +
.../network/bufferprovider/BufferProvider.java | 69 ++
.../bufferprovider/BufferProviderBroker.java | 24 +
.../bufferprovider/GlobalBufferPool.java | 123 +++
.../network/bufferprovider/LocalBufferPool.java | 306 +++++++
.../bufferprovider/LocalBufferPoolOwner.java | 56 ++
.../bufferprovider/SerialSingleBufferPool.java | 77 ++
.../runtime/io/network/envelope/Envelope.java | 169 ++++
.../io/network/envelope/EnvelopeDispatcher.java | 46 ++
.../io/network/envelope/EnvelopeReader.java | 212 +++++
.../network/envelope/EnvelopeReceiverList.java | 75 ++
.../io/network/envelope/EnvelopeWriter.java | 134 +++
.../envelope/NoBufferAvailableException.java | 53 ++
.../io/network/tcp/IncomingConnection.java | 115 +++
.../network/tcp/IncomingConnectionThread.java | 226 +++++
.../io/network/tcp/OutgoingConnection.java | 529 ++++++++++++
.../network/tcp/OutgoingConnectionThread.java | 276 +++++++
.../AdaptiveSpanningRecordDeserializer.java | 521 ++++++++++++
.../io/serialization/DataInputDeserializer.java | 307 +++++++
.../io/serialization/DataOutputSerializer.java | 259 ++++++
.../io/serialization/RecordDeserializer.java | 56 ++
.../io/serialization/RecordSerializer.java | 60 ++
.../serialization/SpanningRecordSerializer.java | 153 ++++
.../eu/stratosphere/nephele/AbstractIDTest.java | 62 ++
.../ChannelDeploymentDescriptorTest.java | 2 +-
.../GateDeploymentDescriptorTest.java | 12 +-
.../TaskDeploymentDescriptorTest.java | 2 +-
.../executiongraph/ExecutionGraphTest.java | 30 +-
.../ForwardTask1Input1Output.java | 9 +-
.../ForwardTask1Input2Outputs.java | 14 +-
.../ForwardTask2Inputs1Output.java | 9 +-
.../executiongraph/SelfCrossForwardTask.java | 6 +-
.../executiongraph/SelfCrossInputTask.java | 6 +-
.../stratosphere/nephele/fs/LineReaderTest.java | 79 --
.../nephele/fs/s3/S3FileSystemTest.java | 461 -----------
.../stratosphere/nephele/io/AbstractIDTest.java | 62 --
.../nephele/io/DefaultChannelSelectorTest.java | 46 --
.../nephele/io/channels/BufferTest.java | 192 -----
.../nephele/io/channels/MemoryBufferTest.java | 155 ----
.../io/channels/serialization/BooleanType.java | 77 --
.../channels/serialization/ByteArrayType.java | 88 --
.../serialization/ByteSubArrayType.java | 98 ---
.../io/channels/serialization/ByteType.java | 77 --
.../io/channels/serialization/CharType.java | 78 --
.../serialization/DeSerializerTest.java | 299 -------
.../io/channels/serialization/DoubleType.java | 78 --
.../io/channels/serialization/FloatType.java | 77 --
.../io/channels/serialization/IntType.java | 77 --
.../io/channels/serialization/LongType.java | 77 --
.../serialization/SerializationTestType.java | 52 --
.../io/channels/serialization/ShortType.java | 77 --
.../channels/serialization/UTFStringType.java | 86 --
.../serialization/UnsignedByteType.java | 77 --
.../serialization/UnsignedShortType.java | 77 --
.../io/library/FileLineReadWriteTest.java | 134 ---
.../nephele/jobmanager/DoubleSourceTask.java | 11 +-
.../nephele/jobmanager/DoubleTargetTask.java | 10 +-
.../nephele/jobmanager/ExceptionTask.java | 6 +-
.../nephele/jobmanager/ForwardTask.java | 10 +-
.../nephele/jobmanager/JobManagerITCase.java | 76 +-
.../nephele/jobmanager/UnionTask.java | 12 +-
.../scheduler/queue/QueueSchedulerTest.java | 10 +-
.../managementgraph/ManagementGraphTest.java | 6 +-
.../profiling/impl/InstanceProfilerTest.java | 2 +-
.../DefaultDeserializerTest.java | 358 --------
.../transferenvelope/DefaultSerializerTest.java | 313 -------
.../nephele/util/BufferPoolConnector.java | 53 --
.../nephele/util/DiscardingRecycler.java | 24 +
.../nephele/util/FileLineReader.java | 80 ++
.../nephele/util/FileLineWriter.java | 75 ++
.../nephele/util/TestBufferProvider.java | 76 ++
.../pact/runtime/task/DataSinkTaskTest.java | 36 +-
.../pact/runtime/task/DataSourceTaskTest.java | 28 +-
.../runtime/task/chaining/ChainTaskTest.java | 11 +-
.../runtime/task/util/OutputEmitterTest.java | 1 +
.../task/util/RecordOutputEmitterTest.java | 2 +
.../pact/runtime/test/util/MockEnvironment.java | 169 ++--
.../pact/runtime/test/util/TaskTestBase.java | 4 +-
.../stratosphere/runtime/fs/LineReaderTest.java | 78 ++
.../runtime/fs/s3/S3FileSystemTest.java | 460 +++++++++++
.../runtime/io/DefaultChannelSelectorTest.java | 47 ++
.../io/library/FileLineReadWriteTest.java | 136 ++++
.../envelope/EnvelopeReaderWriterTest.java | 394 +++++++++
.../DataInputOutputSerializerTest.java | 115 +++
.../io/serialization/PagedViewsTest.java | 160 ++++
.../SpanningRecordSerializationTest.java | 164 ++++
.../SpanningRecordSerializerTest.java | 219 +++++
.../io/serialization/types/AsciiStringType.java | 77 ++
.../io/serialization/types/BooleanType.java | 67 ++
.../io/serialization/types/ByteArrayType.java | 76 ++
.../serialization/types/ByteSubArrayType.java | 91 +++
.../io/serialization/types/ByteType.java | 67 ++
.../io/serialization/types/CharType.java | 68 ++
.../io/serialization/types/DoubleType.java | 68 ++
.../io/serialization/types/FloatType.java | 67 ++
.../runtime/io/serialization/types/IntType.java | 67 ++
.../io/serialization/types/LongType.java | 67 ++
.../types/SerializationTestType.java | 26 +
.../types/SerializationTestTypeFactory.java | 40 +
.../io/serialization/types/ShortType.java | 67 ++
.../serialization/types/UnsignedByteType.java | 67 ++
.../serialization/types/UnsignedShortType.java | 67 ++
.../runtime/io/serialization/types/Util.java | 90 ++
.../BroadcastVarsNepheleITCase.java | 4 +-
.../KMeansIterativeNepheleITCase.java | 3 +-
.../test/iterative/nephele/JobGraphUtils.java | 4 +-
.../CustomCompensatableDanglingPageRank.java | 10 +-
...mpensatableDanglingPageRankWithCombiner.java | 10 +-
.../CompensatableDanglingPageRank.java | 10 +-
.../query1Util/LineItemFilterTest.java | 6 +-
357 files changed, 12871 insertions(+), 20521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
index 74bcaf3..14dcb35 100644
--- a/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
+++ b/stratosphere-addons/spargel/src/main/java/eu/stratosphere/spargel/java/record/SpargelIteration.java
@@ -155,6 +155,7 @@ public class SpargelIteration {
@Override
public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
+
if (vertex.hasNext()) {
Record first = vertex.next();
first.getFieldInto(0, vertexKey);
@@ -270,4 +271,4 @@ public class SpargelIteration {
this.messagingFunction.postSuperstep();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 80ae308..53b4cc1 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -52,8 +52,9 @@ import eu.stratosphere.compiler.plan.WorksetPlanNode;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobOutputVertex;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
@@ -1224,7 +1225,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// connect the fake tail
try {
- rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
}
@@ -1267,7 +1268,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// connect the fake tail
try {
- rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task for termination criterion");
}
@@ -1401,7 +1402,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// connect the fake tail
try {
- nextWorksetVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ nextWorksetVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
}
@@ -1440,7 +1441,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// connect the fake tail
try {
- solutionDeltaVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+ solutionDeltaVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index 51c0a2f..3b9ba3d 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -344,12 +344,6 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
- /**
- * The config parameter defining whether to use the special multicast logic
- * for broadcasts.
- */
- public static final boolean DEFAULT_USE_MULTICAST_FOR_BROADCAST = false;
-
// ------------------------ File System Bahavior ------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
index 2698992..2d63fa4 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/MemorySegment.java
@@ -103,6 +103,11 @@ public class MemorySegment {
public final boolean isFreed() {
return this.memory == null;
}
+
+ public final void free() {
+ this.wrapper = null;
+ this.memory = null;
+ }
/**
* Gets the size of the memory segment, in bytes. Because segments
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
index 73671fa..5b06547 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
@@ -70,7 +70,7 @@ public final class Record implements Value, CopyableValue<Record> {
private Value[] writeFields; // the cache for objects into which the binary representations are read
- private int binaryLen; // the length of the contents in the binary buffer that is valid
+ public int binaryLen; // the length of the contents in the binary buffer that is valid
private int numFields; // the number of fields in the record
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
new file mode 100644
index 0000000..476e22a
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
@@ -0,0 +1,177 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * A statistically unique identification number.
+ */
+public class AbstractID implements IOReadableWritable {
+
+ /** The size of a long in bytes */
+ private static final int SIZE_OF_LONG = 8;
+
+ /** The size of the ID in byte */
+ public static final int SIZE = 2 * SIZE_OF_LONG;
+
+ /** The upper part of the actual ID */
+ private long upperPart;
+
+ /** The lower part of the actual ID */
+ private long lowerPart;
+
+ /**
+ * Constructs a new ID with a specific bytes value.
+ */
+ public AbstractID(byte[] bytes) {
+ if (bytes.length != SIZE) {
+ throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
+ }
+
+ this.lowerPart = byteArrayToLong(bytes, 0);
+ this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
+ }
+
+ /**
+ * Constructs a new abstract ID.
+ *
+ * @param lowerPart the lower bytes of the ID
+ * @param upperPart the higher bytes of the ID
+ */
+ public AbstractID(long lowerPart, long upperPart) {
+ this.lowerPart = lowerPart;
+ this.upperPart = upperPart;
+ }
+
+ /**
+ * Creates a new abstract ID from the given one.
+ * <p>
+ * The given and the newly created abstract ID will be identical, i.e. a comparison by <code>equals</code> will
+ * return <code>true</code> and both objects will have the same hash code.
+ *
+ * @param id the abstract ID to copy
+ */
+ public AbstractID(AbstractID id) {
+ this.lowerPart = id.lowerPart;
+ this.upperPart = id.upperPart;
+ }
+
+ /**
+ * Constructs a new random ID from a uniform distribution.
+ */
+ public AbstractID() {
+ this.lowerPart = generateRandomLong();
+ this.upperPart = generateRandomLong();
+ }
+
+ /**
+ * Generates a uniformly distributed random positive long.
+ *
+ * @return a uniformly distributed random positive long
+ */
+ protected static long generateRandomLong() {
+ return (long) (Math.random() * Long.MAX_VALUE);
+ }
+
+ /**
+ * Converts the given byte array to a long.
+ *
+ * @param ba the byte array to be converted
+ * @param offset the offset indicating at which byte inside the array the conversion shall begin
+ * @return the long variable
+ */
+ private static long byteArrayToLong(byte[] ba, int offset) {
+ long l = 0;
+
+ for (int i = 0; i < SIZE_OF_LONG; ++i) {
+ l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
+ }
+
+ return l;
+ }
+
+ /**
+ * Converts a long to a byte array.
+ *
+ * @param l the long variable to be converted
+ * @param ba the byte array to store the result the of the conversion
+ * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored
+ */
+ private static void longToByteArray(final long l, final byte[] ba, final int offset) {
+ for (int i = 0; i < SIZE_OF_LONG; ++i) {
+ final int shift = i << 3; // i * 8
+ ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift);
+ }
+ }
+
+ /**
+ * Sets an ID from another ID by copying its internal byte representation.
+ *
+ * @param src source ID
+ */
+ public void setID(AbstractID src) {
+ this.lowerPart = src.lowerPart;
+ this.upperPart = src.upperPart;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof AbstractID) {
+ AbstractID src = (AbstractID) obj;
+ return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) this.lowerPart) ^
+ ((int) (this.lowerPart >>> 32)) ^
+ ((int) this.upperPart) ^
+ ((int) (this.upperPart >>> 32));
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.lowerPart = in.readLong();
+ this.upperPart = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(this.lowerPart);
+ out.writeLong(this.upperPart);
+ }
+
+ public void write(ByteBuffer buffer) {
+ buffer.putLong(this.lowerPart);
+ buffer.putLong(this.upperPart);
+ }
+
+ @Override
+ public String toString() {
+ final byte[] ba = new byte[SIZE];
+ longToByteArray(this.lowerPart, ba, 0);
+ longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
+ return StringUtils.byteToHexString(ba);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
deleted file mode 100644
index 012dc02..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/ForceCheckpoint.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to force a checkpoint decision.
- *
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface ForceCheckpoint {
-
- boolean checkpoint();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
deleted file mode 100644
index 433478b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateful.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * This annotation can be used to indicate a stateful task.
- *
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface Stateful {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
deleted file mode 100644
index b3d5e38..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/Stateless.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * This annotation can be used to indicate a stateless task.
- *
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface Stateless {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
deleted file mode 100644
index 317b112..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/annotations/TaskAnnotation.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.annotations;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.CONSTRUCTOR)
-public @interface TaskAnnotation {
-
- boolean statefull();
- double selektivity() default 1;
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
index c39aad0..4068e5b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
@@ -18,7 +18,7 @@ import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
/**
* A channel deployment descriptor contains all the information necessary to deploy either an input or an output channel
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
index 8a78154..02d6578 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
@@ -21,8 +21,8 @@ import java.util.Iterator;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.util.EnumUtils;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
deleted file mode 100644
index 4e6b2e4..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventExample.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/*
- * Copyright 2010 casp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * under the License.
- */
-package eu.stratosphere.nephele.example.events;
-
-import java.io.IOException;
-
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.client.JobSubmissionResult;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-
-/**
- */
-public class EventExample {
-
- public static void main(String[] args) {
-
- JobGraph jobGraph = new JobGraph("Grep Example Job");
-
- JobFileInputVertex input = new JobFileInputVertex("Input 1", jobGraph);
- input.setFileInputClass(FileLineReader.class);
- input.setFilePath(new Path("file:///Users/casp/test2.txt"));
-
- JobTaskVertex task1 = new JobTaskVertex("Task 1", jobGraph);
- task1.setTaskClass(EventSender.class);
-
- JobTaskVertex task2 = new JobTaskVertex("Task 2", jobGraph);
- task2.setTaskClass(EventReceiver.class);
-
- JobFileOutputVertex output = new JobFileOutputVertex("Output 1", jobGraph);
- output.setFileOutputClass(FileLineWriter.class);
- output.setFilePath(new Path("file:///Users/casp/output.txt"));
-
- jobGraph.addJar(new Path("file:///Users/casp/EventTask.jar"));
- jobGraph.addJar(new Path("file:///Users/casp/StringTaskEvent.jar"));
- try {
-
- input.connectTo(task1, ChannelType.INMEMORY);
- task1.connectTo(task2, ChannelType.INMEMORY);
- task2.connectTo(output, ChannelType.INMEMORY);
-
- } catch (JobGraphDefinitionException e) {
- e.printStackTrace();
- }
-
- Configuration conf = new Configuration();
- conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
- conf.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, "6023");
-
- try {
- JobClient jobClient = new JobClient(jobGraph, conf);
- JobSubmissionResult result = jobClient.submitJob();
- System.out.println(result.getDescription());
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
deleted file mode 100644
index 3e5d322..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventReceiver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.events;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- */
-public class EventReceiver extends AbstractTask {
-
- // this is just a dummy input gate...
- private RecordReader<StringRecord> input = null;
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
- this.input.subscribeToEvent(new MyEventListener(), StringTaskEvent.class);
- }
-
- @Override
- public void invoke() throws Exception {
-
- while (this.input.hasNext()) {
-
- StringRecord s = input.next();
- this.output.emit(s);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
deleted file mode 100644
index 4961dac..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/EventSender.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.events;
-
-/*
- * Copyright 2010 casp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * under the License.
- */
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- */
-public class EventSender extends AbstractTask {
-
- // this is just a dummy output gate...
- private RecordReader<StringRecord> input = null;
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
-
- int i = 1;
- while (this.input.hasNext()) {
- i++;
- if (i % 1000 == 0) {
- this.output.publishEvent(new StringTaskEvent("this is the " + i + "th message"));
- }
- StringRecord s = input.next();
- this.output.emit(s);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
deleted file mode 100644
index f285d44..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/events/MyEventListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-/*
- * Copyright 2010 casp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * under the License.
- */
-package eu.stratosphere.nephele.example.events;
-
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.event.task.EventListener;
-import eu.stratosphere.nephele.event.task.StringTaskEvent;
-
-/**
- */
-public class MyEventListener implements EventListener {
-
- @Override
- public void eventOccurred(AbstractTaskEvent event) {
- if (event instanceof StringTaskEvent) {
- System.out.println("Message Event received: " + ((StringTaskEvent) event).getString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
deleted file mode 100644
index 9afef9b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/Grep.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.grep;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
-import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-public class Grep {
-
- public static void main(String[] args) {
-
- JobGraph jobGraph = new JobGraph("Grep Example Job");
-
- JobFileInputVertex input = new JobFileInputVertex("Input 1", jobGraph);
- input.setFileInputClass(FileLineReader.class);
- input.setFilePath(new Path("file:///home/ec2-user/test.txt"));
- input.setInstanceType("t1.micro");
-
- JobTaskVertex task1 = new JobTaskVertex("Task 1", jobGraph);
- task1.setTaskClass(GrepTask.class);
- task1.setInstanceType("t1.micro");
-
-
- JobFileOutputVertex output = new JobFileOutputVertex("Output 1", jobGraph);
- output.setFileOutputClass(FileLineWriter.class);
- output.setFilePath(new Path("file:///tmp/"));
- output.setInstanceType("t1.micro");
-
- try {
-
- input.connectTo(task1, ChannelType.INMEMORY);
- task1.connectTo(output, ChannelType.INMEMORY);
-
- } catch (JobGraphDefinitionException e) {
- e.printStackTrace();
- }
-
- // Create jar file and attach it
- final File jarFile = new File("/tmp/grepJob.jar");
- final JarFileCreator jarFileCreator = new JarFileCreator(jarFile);
- jarFileCreator.addClass(GrepTask.class);
-
- try {
- jarFileCreator.createJarFile();
- System.out.println("done creating!!");
- } catch (IOException ioe) {
-
- if (jarFile.exists()) {
- jarFile.delete();
- }
-
- System.out.println("ERROR creating jar");
- return;
- }
-
- jobGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
- // Submit job
- Configuration conf = new Configuration();
-
- jobGraph.getJobConfiguration().setString("job.cloud.awsaccessid", "xxx");
- jobGraph.getJobConfiguration().setString("job.cloud.awssecretkey", "xxx");
- jobGraph.getJobConfiguration().setString("job.cloud.sshkeypair", "caspeu");
- jobGraph.getJobConfiguration().setString("job.ec2.image.id", "ami-d64474a2");
-
- InetSocketAddress jobmanager = new InetSocketAddress("127.0.0.1", 6123);
-
-
- try {
- final JobClient jobClient = new JobClient(jobGraph, conf, jobmanager);
- System.out.println("submitting");
- jobClient.submitJobAndWait();
- System.out.println("done.");
- } catch (Exception e) {
- System.out.println(e);
- }
-
- if (jarFile.exists()) {
- jarFile.delete();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
deleted file mode 100644
index ae72f1c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/grep/GrepTask.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.grep;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-public class GrepTask extends AbstractTask {
-
- private RecordReader<StringRecord> input = null;
-
- private RecordWriter<StringRecord> output = null;
-
- private int i = 0;
-
- @Override
- public void invoke() throws Exception {
-
- while (this.input.hasNext()) {
-
- StringRecord s = input.next();
- this.output.emit(s);
- i++;
- }
-
- System.out.println("GREP: Emmited all " + i + " records");
- }
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
deleted file mode 100644
index f85024c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.api.common.JobExecutionResult;
-import eu.stratosphere.configuration.ConfigConstants;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.client.JobClient;
-import eu.stratosphere.nephele.client.JobExecutionException;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGenericOutputVertex;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
-import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
-import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.util.JarFileCreator;
-
-/**
- * This class implements a speed test for Nephele. It's primary purpose is to benchmark the performance of Nephele's
- * network channels with different degrees of parallelism.
- *
- */
-public final class SpeedTest {
-
- /**
- * Configuration key to specify the amount of data to be send in GB.
- */
- static final String DATA_VOLUME_CONFIG_KEY = "data.volume";
-
- /**
- * Entry point to the application.
- *
- * @param args
- * the provided arguments
- */
- public static void main(final String[] args) {
-
- // Parse the arguments first
- if (args.length < 4) {
- System.err
- .println("Insufficient number of arguments. Please provide <job manager address> <amount of data to send in GB> <number of subtasks> <number of subtasks per task> (<use forwarder>)");
- System.exit(1);
- return;
- }
-
- // Determine the job manager address
- final String jobManagerAddress = args[0];
-
- // Determine amount of data to send in GB
- int amountOfDataToSend = 0;
- try {
- amountOfDataToSend = Integer.parseInt(args[1]);
- } catch (NumberFormatException e) {
- System.err.println("Cannot parse amount of data to send. Please provide a positive integer value.");
- System.exit(1);
- return;
- }
-
- if (amountOfDataToSend <= 0 || amountOfDataToSend > 1024) {
- System.err
- .println("Please provide an integer value between 1 and 1024 indicating the amount of data to send in GB.");
- System.exit(1);
- return;
- }
-
- // Determine the number of subtasks
- int numberOfSubtasks = 0;
- try {
- numberOfSubtasks = Integer.parseInt(args[2]);
- } catch (NumberFormatException e) {
- System.err.println("Cannot parse the number of subtasks. Please provide a positive integer value.");
- System.exit(1);
- return;
- }
-
- if (numberOfSubtasks <= 0) {
- System.err.println("Please provide a positive integer value indicating the number of subtasks.");
- System.exit(1);
- return;
- }
-
- // Determine the number of subtasks per instance
- int numberOfSubtasksPerInstance = 0;
- try {
- numberOfSubtasksPerInstance = Integer.parseInt(args[3]);
- } catch (NumberFormatException e) {
- System.err
- .println("Cannot parse the number of subtasks per instance. Please provide a positive integer value.");
- System.exit(1);
- return;
- }
-
- if (numberOfSubtasksPerInstance <= 0) {
- System.err
- .println("Please provide a positive integer value indicating the number of subtasks per instance.");
- System.exit(1);
- return;
- }
-
- // Determine whether to use a forwarder or not
- boolean useForwarder = false;
- if (args.length >= 5) {
- useForwarder = Boolean.parseBoolean(args[4]);
- }
-
- final JobGraph jobGraph = new JobGraph("Nephele Speed Test");
-
- final JobGenericInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
- producer.setInputClass(SpeedTestProducer.class);
- producer.setNumberOfSubtasks(numberOfSubtasks);
- producer.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
- producer.getConfiguration().setInteger(DATA_VOLUME_CONFIG_KEY, amountOfDataToSend);
-
- JobTaskVertex forwarder = null;
- if (useForwarder) {
- forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
- forwarder.setTaskClass(SpeedTestForwarder.class);
- forwarder.setNumberOfSubtasks(numberOfSubtasks);
- forwarder.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
- }
-
- final JobGenericOutputVertex consumer = new JobGenericOutputVertex("Speed Test Consumer", jobGraph);
- consumer.setOutputClass(SpeedTestConsumer.class);
- consumer.setNumberOfSubtasks(numberOfSubtasks);
- consumer.setNumberOfSubtasksPerInstance(numberOfSubtasksPerInstance);
-
- // Set vertex sharing
- producer.setVertexToShareInstancesWith(consumer);
- if (forwarder != null) {
- forwarder.setVertexToShareInstancesWith(consumer);
- }
-
- // Connect the vertices
- try {
- if (forwarder == null) {
- producer.connectTo(consumer, ChannelType.NETWORK,
- DistributionPattern.BIPARTITE);
- } else {
- producer.connectTo(forwarder, ChannelType.NETWORK,
- DistributionPattern.BIPARTITE);
- forwarder.connectTo(consumer, ChannelType.NETWORK,
- DistributionPattern.BIPARTITE);
- }
-
- } catch (JobGraphDefinitionException e) {
- e.printStackTrace();
- System.exit(1);
- return;
- }
-
- File jarFile = null;
-
- try {
-
- // Create jar file of job
- jarFile = File.createTempFile("speedtest", "jar");
- jarFile.deleteOnExit();
-
- final JarFileCreator jfc = new JarFileCreator(jarFile);
- jfc.addClass(SpeedTest.class);
- jfc.addClass(SpeedTestProducer.class);
- jfc.addClass(SpeedTestForwarder.class);
- jfc.addClass(SpeedTestConsumer.class);
- jfc.addClass(SpeedTestRecord.class);
- jfc.createJarFile();
-
- jobGraph.addJar(new Path("file://" + jarFile.getAbsolutePath()));
-
- final Configuration clientConfiguration = new Configuration();
- clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress);
- clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
- final JobClient jobClient = new JobClient(jobGraph, clientConfiguration);
- final JobExecutionResult jobResult = jobClient.submitJobAndWait();
- final long executionTime = jobResult.getNetRuntime();
-
- // Calculate throughput in MBit/s and output it
- System.out.print("Job finished with a throughput of " + toMBitPerSecond(amountOfDataToSend, executionTime));
-
- } catch (IOException ioe) {
- ioe.printStackTrace();
- System.exit(1);
- return;
- } catch (JobExecutionException jee) {
- jee.printStackTrace();
- System.exit(1);
- return;
- }
- }
-
- /**
- * Calculates the throughput in MBit/s from the given amount of data that has been sent and the execution time.
- *
- * @param amountOfDataToSend
- * the amount of data that has been sent in GB
- * @param executionTime
- * the execution time in milliseconds
- * @return the resulting throughput in MBit/s
- */
- private static int toMBitPerSecond(final int amountOfDataToSend, final long executionTime) {
-
- final double dataVolumeInMBit = amountOfDataToSend * 8192.0;
- final double executionTimeInSeconds = executionTime / 1000.0;
-
- return (int) Math.round(dataVolumeInMBit / executionTimeInSeconds);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
deleted file mode 100644
index eb40196..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestConsumer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-
-/**
- * This class implements the consumer task of the speed test. The consumer task simply drops all received records.
- *
- */
-public class SpeedTestConsumer extends AbstractOutputTask {
-
- /**
- * The record reader used to read the incoming records.
- */
- private MutableRecordReader<SpeedTestRecord> input;
-
-
- @Override
- public void registerInputOutput() {
-
- this.input = new MutableRecordReader<SpeedTestRecord>(this);
- }
-
-
- @Override
- public void invoke() throws Exception {
-
- final SpeedTestRecord record = new SpeedTestRecord();
- while (this.input.next(record)) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
deleted file mode 100644
index 91090a8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestForwarder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractTask;
-
-/**
- * This class implements a forwarder task for the speed test. The forwarder task immediately outputs every record it
- * reads.
- *
- */
-public final class SpeedTestForwarder extends AbstractTask {
-
- /**
- * The record reader used to read incoming records.
- */
- private MutableRecordReader<SpeedTestRecord> input;
-
- /**
- * The record writer used to forward incoming records.
- */
- private RecordWriter<SpeedTestRecord> output;
-
-
- @Override
- public void registerInputOutput() {
-
- this.input = new MutableRecordReader<SpeedTestRecord>(this);
- this.output = new RecordWriter<SpeedTestRecord>(this, SpeedTestRecord.class);
- }
-
-
- @Override
- public void invoke() throws Exception {
-
- final SpeedTestRecord record = new SpeedTestRecord();
- while (this.input.next(record)) {
- this.output.emit(record);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
deleted file mode 100644
index 78ea23d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestProducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-
-/**
- * This class implements the producer task which produces test records for the speed test.
- *
- */
-public final class SpeedTestProducer extends AbstractGenericInputTask {
-
- /**
- * The record writer to emit the produced records.
- */
- private RecordWriter<SpeedTestRecord> writer;
-
-
- @Override
- public void registerInputOutput() {
-
- this.writer = new RecordWriter<SpeedTestRecord>(this, SpeedTestRecord.class);
- }
-
-
- @Override
- public void invoke() throws Exception {
-
- // Determine the amount of data to send per subtask
- final int dataVolumePerSubtaskInMB = getTaskConfiguration().getInteger(SpeedTest.DATA_VOLUME_CONFIG_KEY, 1)
- * 1024 / getCurrentNumberOfSubtasks();
-
- final long numberOfRecordsToEmit = dataVolumePerSubtaskInMB * 1024 * 1024 / SpeedTestRecord.RECORD_SIZE;
-
- final SpeedTestRecord record = new SpeedTestRecord();
-
- for (long i = 0; i < numberOfRecordsToEmit; ++i) {
- this.writer.emit(record);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
deleted file mode 100644
index b3522f3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/speedtest/SpeedTestRecord.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.speedtest;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-
-/**
- * This class implements the record type used for the speed test.
- */
-public final class SpeedTestRecord implements IOReadableWritable {
-
- /**
- * The size of a single record in bytes.
- */
- static final int RECORD_SIZE = 128;
-
- /**
- * The byte buffer which actually stored the record's data.
- */
- private final byte[] buf = new byte[RECORD_SIZE];
-
- /**
- * Constructs a new record and initializes it.
- */
- public SpeedTestRecord() {
- for (int i = 0; i < RECORD_SIZE; ++i) {
- this.buf[i] = (byte) (i % 128);
- }
- }
-
- @Override
- public void write(final DataOutput out) throws IOException {
- out.write(this.buf);
- }
-
- @Override
- public void read(final DataInput in) throws IOException {
- in.readFully(this.buf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
deleted file mode 100644
index 209a81e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ConsumerTask.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-public class ConsumerTask extends AbstractFileOutputTask {
-
- private RecordReader<StringRecord> input;
-
- @Override
- public void registerInputOutput() {
-
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-
- }
-
- @Override
- public void invoke() throws Exception {
-
- int count = 0;
-
- while (this.input.hasNext()) {
- this.input.next();
- ++count;
- }
-
- System.out.println("Consumer receiver " + count + " records in total");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
deleted file mode 100644
index 647a07b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/example/union/ProducerTask.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.example.union;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-
-public class ProducerTask extends AbstractFileInputTask {
-
- private static final int NUMBER_OF_RECORDS_TO_PRODUCE = 1000000;
-
- private RecordWriter<StringRecord> output;
-
- @Override
- public void registerInputOutput() {
-
- this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {
-
- for (int i = 0; i < NUMBER_OF_RECORDS_TO_PRODUCE; ++i) {
-
- final StringRecord record = new StringRecord("Record " + i + " of " + this);
- this.output.emit(record);
- }
-
- }
-
-}