You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:49 UTC
[14/30] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
new file mode 100644
index 0000000..c9db615
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/Gate.java
@@ -0,0 +1,174 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.event.task.EventListener;
+import eu.stratosphere.nephele.event.task.EventNotificationManager;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+import java.io.IOException;
+
+/**
+ * In Nephele a gate represents the connection between a user program and the processing framework. A gate
+ * must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
+ * is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
+ * to transport one specific type of records.
+ * <p>
+ * This class in general is not thread-safe.
+ *
+ * @param <T>
+ * the record type to be transported from this gate
+ *
+ * TODO refactor with changes to input side
+ */
+public abstract class Gate<T extends IOReadableWritable> {
+
+ /**
+ * The ID of the job this gate belongs to.
+ */
+ private final JobID jobID;
+
+ /**
+ * The ID of this gate.
+ */
+ private final GateID gateID;
+
+ /**
+ * The index of the gate in the list of available input/output gates.
+ */
+ private final int index;
+
+ /**
+ * The event notification manager used to dispatch events.
+ */
+ private final EventNotificationManager eventNotificationManager = new EventNotificationManager();
+
+ /**
+ * The type of input/output channels connected to this gate.
+ */
+ private ChannelType channelType = ChannelType.NETWORK;
+
+ /**
+ * Constructs a new abstract gate
+ *
+ * @param jobID
+ * the ID of the job this gate belongs to
+ * @param gateID
+ * the ID of this gate
+ * @param index
+ * the index of the gate in the list of available input/output gates.
+ */
+ protected Gate(final JobID jobID, final GateID gateID, final int index) {
+ this.jobID = jobID;
+ this.gateID = gateID;
+ this.index = index;
+ }
+
+ public final int getIndex() {
+ return this.index;
+ }
+
+ /**
+ * Returns the event notification manager used to dispatch events.
+ *
+ * @return the event notification manager used to dispatch events
+ */
+ protected final EventNotificationManager getEventNotificationManager() {
+ return this.eventNotificationManager;
+ }
+
+ public String toString() {
+
+ return "Gate " + this.index;
+ }
+
+ public final void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+
+ this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
+ }
+
+ public final void unsubscribeFromEvent(final EventListener eventListener,
+ final Class<? extends AbstractTaskEvent> eventType) {
+
+ this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
+ }
+
+ public final void deliverEvent(final AbstractTaskEvent event) {
+
+ this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
+ }
+
+ public final void setChannelType(final ChannelType channelType) {
+
+ this.channelType = channelType;
+ }
+
+ public final ChannelType getChannelType() {
+
+ return this.channelType;
+ }
+
+ public JobID getJobID() {
+
+ return this.jobID;
+ }
+
+ public GateID getGateID() {
+
+ return this.gateID;
+ }
+
+ // FROM GATE INTERFACE
+
+ /**
+ * Publishes an event.
+ *
+ * @param event
+ * the event to be published
+ * @throws IOException
+ * thrown if an error occurs while transmitting the event
+ * @throws InterruptedException
+ * thrown if the thread is interrupted while waiting for the event to be published
+ */
+ abstract public void publishEvent(AbstractEvent event) throws IOException, InterruptedException;
+
+ /**
+ * Releases the allocated resources (particularly buffer) of all channels attached to this gate. This method
+ * should only be called after the respected task has stopped running.
+ */
+ abstract public void releaseAllChannelResources();
+
+ /**
+ * Checks if the gate is closed. The gate is closed if all this associated channels are closed.
+ *
+ * @return <code>true</code> if the gate is closed, <code>false</code> otherwise
+ * @throws IOException
+ * thrown if any error occurred while closing the gate
+ * @throws InterruptedException
+ * thrown if the gate is interrupted while waiting for this operation to complete
+ */
+ abstract public boolean isClosed() throws IOException, InterruptedException;
+
+ /**
+ * Checks if the considered gate is an input gate.
+ *
+ * @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
+ */
+ abstract public boolean isInputGate();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
new file mode 100644
index 0000000..8375c88
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/GateID.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.nephele.AbstractID;
+
+/**
+ * A class for statistically unique gate IDs.
+ *
+ */
+public final class GateID extends AbstractID {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
new file mode 100644
index 0000000..e2083e5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputChannelResult.java
@@ -0,0 +1,23 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.gates;
+
+public enum InputChannelResult {
+
+ NONE,
+ INTERMEDIATE_RECORD_FROM_BUFFER,
+ LAST_RECORD_FROM_BUFFER,
+ END_OF_SUPERSTEP,
+ TASK_EVENT,
+ END_OF_STREAM;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
new file mode 100644
index 0000000..bdac7a2
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
@@ -0,0 +1,384 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
+import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.runtime.io.channels.InputChannel;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+/**
+ * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
+ * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
+ * output gates input gates can be associated with a {@link eu.stratosphere.runtime.io.serialization.io.DistributionPattern} object which dictates the concrete
+ * wiring between two groups of vertices.
+ *
+ * @param <T> The type of record that can be transported through this gate.
+ */
+public class InputGate<T extends IOReadableWritable> extends Gate<T> implements BufferProvider, LocalBufferPoolOwner {
+
+ /**
+ * The log object used for debugging.
+ */
+ private static final Log LOG = LogFactory.getLog(InputGate.class);
+
+ /**
+ * The array of input channels attached to this input gate.
+ */
+ private InputChannel<T>[] channels;
+
+ /**
+ * Queue with indices of channels that store at least one available record.
+ */
+ private final BlockingQueue<Integer> availableChannels = new LinkedBlockingQueue<Integer>();
+
+ /**
+ * The listener object to be notified when a channel has at least one record available.
+ */
+ private final AtomicReference<RecordAvailabilityListener<T>> recordAvailabilityListener = new AtomicReference<RecordAvailabilityListener<T>>(null);
+
+
+ private AbstractTaskEvent currentEvent;
+
+ /**
+ * If the value of this variable is set to <code>true</code>, the input gate is closed.
+ */
+ private boolean isClosed = false;
+
+ /**
+ * The channel to read from next.
+ */
+ private int channelToReadFrom = -1;
+
+ private LocalBufferPool bufferPool;
+
+ /**
+ * Constructs a new runtime input gate.
+ *
+ * @param jobID
+ * the ID of the job this input gate belongs to
+ * @param gateID
+ * the ID of the gate
+ * @param index
+ * the index assigned to this input gate at the {@link Environment} object
+ */
+ public InputGate(final JobID jobID, final GateID gateID, final int index) {
+ super(jobID, gateID, index);
+ }
+
+ public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
+ channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
+
+ setChannelType(inputGateDescriptor.getChannelType());
+
+ final int nicdd = inputGateDescriptor.getNumberOfChannelDescriptors();
+
+ for(int i = 0; i < nicdd; i++){
+ final ChannelDeploymentDescriptor cdd = inputGateDescriptor.getChannelDescriptor(i);
+ channels[i] = new InputChannel<T>(this, i, cdd.getInputChannelID(),
+ cdd.getOutputChannelID(), getChannelType());
+ }
+ }
+
+ @Override
+ public boolean isInputGate() {
+ return true;
+ }
+
+ /**
+ * Returns the number of input channels associated with this input gate.
+ *
+ * @return the number of input channels associated with this input gate
+ */
+ public int getNumberOfInputChannels() {
+ return this.channels.length;
+ }
+
+ /**
+ * Returns the input channel from position <code>pos</code> of the gate's internal channel list.
+ *
+ * @param pos
+ * the position to retrieve the channel from
+ * @return the channel from the given position or <code>null</code> if such position does not exist.
+ */
+ public InputChannel<T> getInputChannel(int pos) {
+ return this.channels[pos];
+ }
+
+ public InputChannel<T>[] channels() {
+ return this.channels;
+ }
+
+ /**
+ * Reads a record from one of the associated input channels. Channels are read such that one buffer from a channel is
+ * consecutively consumed. The buffers in turn are consumed in the order in which they arrive.
+ * Note that this method is not guaranteed to return a record, because the currently available channel data may not always
+ * constitute an entire record, when events or partial records are part of the data.
+ *
+ * When called even though no data is available, this call will block until data is available, so this method should be called
+ * when waiting is desired (such as when synchronously consuming a single gate) or only when it is known that data is available
+ * (such as when reading a union of multiple input gates).
+ *
+ * @param target The record object into which to construct the complete record.
+ * @return The result indicating whether a complete record is available, a event is available, only incomplete data
+ * is available (NONE), or the gate is exhausted.
+ * @throws IOException Thrown when an error occurred in the network stack relating to this channel.
+ * @throws InterruptedException Thrown, when the thread working on this channel is interrupted.
+ */
+ public InputChannelResult readRecord(T target) throws IOException, InterruptedException {
+
+ if (this.channelToReadFrom == -1) {
+ if (this.isClosed()) {
+ return InputChannelResult.END_OF_STREAM;
+ }
+
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ this.channelToReadFrom = waitForAnyChannelToBecomeAvailable();
+ }
+
+ InputChannelResult result = this.getInputChannel(this.channelToReadFrom).readRecord(target);
+ switch (result) {
+ case INTERMEDIATE_RECORD_FROM_BUFFER: // full record and we can stay on the same channel
+ return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+
+ case LAST_RECORD_FROM_BUFFER: // full record, but we must switch the channel afterwards
+ this.channelToReadFrom = -1;
+ return InputChannelResult.LAST_RECORD_FROM_BUFFER;
+
+ case END_OF_SUPERSTEP:
+ this.channelToReadFrom = -1;
+ return InputChannelResult.END_OF_SUPERSTEP;
+
+ case TASK_EVENT: // task event
+ this.currentEvent = this.getInputChannel(this.channelToReadFrom).getCurrentEvent();
+ this.channelToReadFrom = -1; // event always marks a unit as consumed
+ return InputChannelResult.TASK_EVENT;
+
+ case NONE: // internal event or an incomplete record that needs further chunks
+ // the current unit is exhausted
+ this.channelToReadFrom = -1;
+ return InputChannelResult.NONE;
+
+ case END_OF_STREAM: // channel is done
+ this.channelToReadFrom = -1;
+ return isClosed() ? InputChannelResult.END_OF_STREAM : InputChannelResult.NONE;
+
+ default: // silence the compiler
+ throw new RuntimeException();
+ }
+ }
+
+ public AbstractTaskEvent getCurrentEvent() {
+ AbstractTaskEvent e = this.currentEvent;
+ this.currentEvent = null;
+ return e;
+ }
+
+ /**
+ * Notify the gate that the channel with the given index has
+ * at least one record available.
+ *
+ * @param channelIndex
+ * the index of the channel which has at least one record available
+ */
+ public void notifyRecordIsAvailable(int channelIndex) {
+ this.availableChannels.add(Integer.valueOf(channelIndex));
+
+ RecordAvailabilityListener<T> listener = this.recordAvailabilityListener.get();
+ if (listener != null) {
+ listener.reportRecordAvailability(this);
+ }
+ }
+
+ /**
+ * This method returns the index of a channel which has at least
+ * one record available. The method may block until at least one
+ * channel has become ready.
+ *
+ * @return the index of the channel which has at least one record available
+ */
+ public int waitForAnyChannelToBecomeAvailable() throws InterruptedException {
+ return this.availableChannels.take().intValue();
+ }
+
+
+ @Override
+ public boolean isClosed() throws IOException, InterruptedException {
+
+ if (this.isClosed) {
+ return true;
+ }
+
+ for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
+ final InputChannel<T> inputChannel = this.channels[i];
+ if (!inputChannel.isClosed()) {
+ return false;
+ }
+ }
+
+ this.isClosed = true;
+
+ return true;
+ }
+
+
+ /**
+ * Immediately closes the input gate and all its input channels. The corresponding
+ * output channels are notified. Any remaining records in any buffers or queue is considered
+ * irrelevant and is discarded.
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while closing the gate
+ * @throws InterruptedException
+ * thrown if the thread is interrupted while waiting for the gate to be closed
+ */
+ public void close() throws IOException, InterruptedException {
+
+ for (int i = 0; i < this.getNumberOfInputChannels(); i++) {
+ final InputChannel<T> inputChannel = this.channels[i];
+ inputChannel.close();
+ }
+
+ }
+
+
+ @Override
+ public String toString() {
+ return "Input " + super.toString();
+ }
+
+
+ @Override
+ public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
+
+ // Copy event to all connected channels
+ for(int i=0; i< getNumberOfChannels(); i++){
+ channels[i].transferEvent(event);
+ }
+ }
+
+
+ @Override
+ public void releaseAllChannelResources() {
+
+ for(int i=0; i< getNumberOfChannels(); i++){
+ channels[i].releaseAllResources();
+ }
+ }
+
+ /**
+ * Registers a {@link RecordAvailabilityListener} with this input gate.
+ *
+ * @param listener
+ * the listener object to be registered
+ */
+ public void registerRecordAvailabilityListener(final RecordAvailabilityListener<T> listener) {
+ if (!this.recordAvailabilityListener.compareAndSet(null, listener)) {
+ throw new IllegalStateException(this.recordAvailabilityListener
+ + " is already registered as a record availability listener");
+ }
+ }
+
+ /**
+ * Notify the gate that is has consumed a data unit from the channel with the given index
+ *
+ * @param channelIndex
+ * the index of the channel from which a data unit has been consumed
+ */
+ public void notifyDataUnitConsumed(int channelIndex) {
+ this.channelToReadFrom = -1;
+ }
+
+ //
+
+ @Override
+ public Buffer requestBuffer(int minBufferSize) throws IOException {
+ return this.bufferPool.requestBuffer(minBufferSize);
+ }
+
+ @Override
+ public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+ return this.bufferPool.requestBufferBlocking(minBufferSize);
+ }
+
+ @Override
+ public int getBufferSize() {
+ return this.bufferPool.getBufferSize();
+ }
+
+ @Override
+ public int getNumberOfChannels() {
+ return getNumberOfInputChannels();
+ }
+
+ @Override
+ public void setDesignatedNumberOfBuffers(int numBuffers) {
+ this.bufferPool.setNumDesignatedBuffers(numBuffers);
+ }
+
+ @Override
+ public void clearLocalBufferPool() {
+ this.bufferPool.destroy();
+ }
+
+ @Override
+ public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
+ this.bufferPool = new LocalBufferPool(globalBufferPool, 1);
+ }
+
+ @Override
+ public void logBufferUtilization() {
+ LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
+ this,
+ this.bufferPool.numAvailableBuffers(),
+ this.bufferPool.numRequestedBuffers(),
+ this.bufferPool.numDesignatedBuffers()));
+ }
+
+ @Override
+ public void reportAsynchronousEvent() {
+ this.bufferPool.reportAsynchronousEvent();
+ }
+
+ @Override
+ public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ return this.bufferPool.registerBufferAvailabilityListener(listener);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
new file mode 100644
index 0000000..d3eaea1
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
@@ -0,0 +1,165 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
+import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+import java.io.IOException;
+
+public class OutputGate extends Gate<IOReadableWritable> {
+
+ private OutputChannel[] channels;
+
+ private boolean closed;
+
+ /**
+ * Constructs a new output gate.
+ *
+ * @param jobId the ID of the job this input gate belongs to
+ * @param gateId the ID of the gate
+ * @param index the index assigned to this output gate at the Environment object
+ */
+ public OutputGate(JobID jobId, GateID gateId, int index) {
+ super(jobId, gateId, index);
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Data processing
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
+ this.channels[targetChannel].sendBuffer(buffer);
+ }
+
+ public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+ this.channels[targetChannel].sendEvent(event);
+ }
+
+ public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+ this.channels[targetChannel].sendBufferAndEvent(buffer, event);
+ }
+
+ public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException {
+ for (int i = 1; i < this.channels.length; i++) {
+ channels[i].sendBuffer(buffer.duplicate());
+ }
+ channels[0].sendBuffer(buffer);
+ }
+
+ public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+ for (OutputChannel channel : this.channels) {
+ channel.sendEvent(event);
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Channels
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public void initializeChannels(GateDeploymentDescriptor descriptor) {
+ int numChannels = descriptor.getNumberOfChannelDescriptors();
+ this.channels = new OutputChannel[numChannels];
+
+ setChannelType(descriptor.getChannelType());
+
+ for (int i = 0; i < numChannels; i++) {
+ ChannelDeploymentDescriptor channelDescriptor = descriptor.getChannelDescriptor(i);
+
+ ChannelID id = channelDescriptor.getOutputChannelID();
+ ChannelID connectedId = channelDescriptor.getInputChannelID();
+
+ this.channels[i] = new OutputChannel(this, i, id, connectedId, getChannelType());
+ }
+ }
+
+ public OutputChannel[] channels() {
+ return this.channels;
+ }
+
+ public OutputChannel getChannel(int index) {
+ return (index < this.channels.length) ? this.channels[index] : null;
+ }
+
+ public int getNumChannels() {
+ return this.channels.length;
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Shutdown
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public void requestClose() throws IOException, InterruptedException {
+ for (OutputChannel channel : this.channels) {
+ channel.requestClose();
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ if (this.closed) {
+ return true;
+ }
+
+ for (OutputChannel channel : this.channels) {
+ if (!channel.isClosed()) {
+ return false;
+ }
+ }
+
+ this.closed = true;
+ return true;
+ }
+
+ public void waitForGateToBeClosed() throws InterruptedException {
+ if (this.closed) {
+ return;
+ }
+
+ for (OutputChannel channel : this.channels) {
+ channel.waitForChannelToBeClosed();
+ }
+
+ this.closed = true;
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isInputGate() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "Output " + super.toString();
+ }
+
+ @Override
+ public void publishEvent(AbstractEvent event) throws IOException, InterruptedException {
+ // replaced by broadcastEvent(AbstractEvent) => TODO will be removed with changes to input side
+ }
+
+ @Override
+ public void releaseAllChannelResources() {
+ // nothing to do for buffer oriented runtime => TODO will be removed with changes to input side
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
new file mode 100644
index 0000000..ea1d865
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/RecordAvailabilityListener.java
@@ -0,0 +1,36 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.gates;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.gates.InputGate;
+
+/**
+ * This interface can be implemented by a class which shall be notified by an input gate when one of the its connected
+ * input channels has at least one record available for reading.
+ *
+ * @param <T>
+ * the type of record transported through the corresponding input gate
+ */
+public interface RecordAvailabilityListener<T extends IOReadableWritable> {
+
+ /**
+ * This method is called by an input gate when one of its connected input channels has at least one record available
+ * for reading.
+ *
+ * @param inputGate
+ * the input gate which has at least one record available
+ */
+ void reportRecordAvailability(InputGate<T> inputGate);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
new file mode 100644
index 0000000..405d79e
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -0,0 +1,646 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.execution.CancelTaskException;
+import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
+import eu.stratosphere.runtime.io.channels.Channel;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.InputChannel;
+import eu.stratosphere.runtime.io.channels.OutputChannel;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
+import eu.stratosphere.nephele.taskmanager.Task;
+import eu.stratosphere.nephele.AbstractID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
+import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
+import eu.stratosphere.runtime.io.network.bufferprovider.SerialSingleBufferPool;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeReceiverList;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The channel manager sets up the network buffers and dispatches data between channels.
+ */
+public final class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
+
+ private static final Log LOG = LogFactory.getLog(ChannelManager.class);
+
+ private final ChannelLookupProtocol channelLookupService;
+
+ private final InstanceConnectionInfo connectionInfo;
+
+ private final Map<ChannelID, Channel> channels;
+
+ private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
+
+ private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
+
+ private final GlobalBufferPool globalBufferPool;
+
+ private final NetworkConnectionManager networkConnectionManager;
+
+ private final InetSocketAddress ourAddress;
+
+ private final SerialSingleBufferPool discardingDataPool;
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
+ int numNetworkBuffers, int networkBufferSize) throws IOException {
+ this.channelLookupService = channelLookupService;
+ this.connectionInfo = connectionInfo;
+ this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
+ this.networkConnectionManager = new NetworkConnectionManager(this, connectionInfo.address(), connectionInfo.dataPort());
+
+ // management data structures
+ this.channels = new ConcurrentHashMap<ChannelID, Channel>();
+ this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>();
+ this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
+
+ this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
+
+ // a special pool if the data is to be discarded
+ this.discardingDataPool = new SerialSingleBufferPool(networkBufferSize);
+ }
+
+ public void shutdown() {
+ this.networkConnectionManager.shutDown();
+ this.globalBufferPool.destroy();
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Task registration
+ // -----------------------------------------------------------------------------------------------------------------
+
+ /**
+ * Registers the given task with the channel manager.
+ *
+ * @param task the task to be registered
+ * @throws InsufficientResourcesException thrown if not enough buffers available to safely run this task
+ */
+ public void register(Task task) throws InsufficientResourcesException {
+ // Check if we can safely run this task with the given buffers
+ ensureBufferAvailability(task);
+
+ RuntimeEnvironment environment = task.getRuntimeEnvironment();
+
+ // -------------------------------------------------------------------------------------------------------------
+ // Register output channels
+ // -------------------------------------------------------------------------------------------------------------
+
+ environment.registerGlobalBufferPool(this.globalBufferPool);
+
+ if (this.localBuffersPools.containsKey(task.getVertexID())) {
+ throw new IllegalStateException("Vertex " + task.getVertexID() + " has a previous buffer pool owner");
+ }
+
+ for (OutputGate gate : environment.outputGates()) {
+ // add receiver list hints
+ for (OutputChannel channel : gate.channels()) {
+ // register envelope dispatcher with the channel
+ channel.registerEnvelopeDispatcher(this);
+
+ switch (channel.getChannelType()) {
+ case IN_MEMORY:
+ addReceiverListHint(channel.getID(), channel.getConnectedId());
+ break;
+ case NETWORK:
+ addReceiverListHint(channel.getConnectedId(), channel.getID());
+ break;
+ }
+
+ this.channels.put(channel.getID(), channel);
+ }
+ }
+
+ this.localBuffersPools.put(task.getVertexID(), environment);
+
+ // -------------------------------------------------------------------------------------------------------------
+ // Register input channels
+ // -------------------------------------------------------------------------------------------------------------
+
+ // register global
+ for (InputGate<?> gate : environment.inputGates()) {
+ gate.registerGlobalBufferPool(this.globalBufferPool);
+
+ for (int i = 0; i < gate.getNumberOfInputChannels(); i++) {
+ InputChannel<? extends IOReadableWritable> channel = gate.getInputChannel(i);
+ channel.registerEnvelopeDispatcher(this);
+
+ if (channel.getChannelType() == ChannelType.IN_MEMORY) {
+ addReceiverListHint(channel.getID(), channel.getConnectedId());
+ }
+
+ this.channels.put(channel.getID(), channel);
+ }
+
+ this.localBuffersPools.put(gate.getGateID(), gate);
+ }
+
+ // the number of channels per buffers has changed after unregistering the task
+ // => redistribute the number of designated buffers of the registered local buffer pools
+ redistributeBuffers();
+ }
+
+ /**
+ * Unregisters the given task from the channel manager.
+ *
+ * @param vertexId the ID of the task to be unregistered
+ * @param task the task to be unregistered
+ */
+ public void unregister(ExecutionVertexID vertexId, Task task) {
+ final Environment environment = task.getEnvironment();
+
+ // destroy and remove OUTPUT channels from registered channels and cache
+ for (ChannelID id : environment.getOutputChannelIDs()) {
+ Channel channel = this.channels.remove(id);
+ if (channel != null) {
+ channel.destroy();
+ }
+
+ this.receiverCache.remove(channel);
+ }
+
+ // destroy and remove INPUT channels from registered channels and cache
+ for (ChannelID id : environment.getInputChannelIDs()) {
+ Channel channel = this.channels.remove(id);
+ if (channel != null) {
+ channel.destroy();
+ }
+
+ this.receiverCache.remove(channel);
+ }
+
+ // clear and remove INPUT side buffer pools
+ for (GateID id : environment.getInputGateIDs()) {
+ LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(id);
+ if (bufferPool != null) {
+ bufferPool.clearLocalBufferPool();
+ }
+ }
+
+ // clear and remove OUTPUT side buffer pool
+ LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(vertexId);
+ if (bufferPool != null) {
+ bufferPool.clearLocalBufferPool();
+ }
+
+ // the number of channels per buffers has changed after unregistering the task
+ // => redistribute the number of designated buffers of the registered local buffer pools
+ redistributeBuffers();
+ }
+
+ /**
+ * Ensures that the channel manager has enough buffers to execute the given task.
+ * <p>
+ * If there is less than one buffer per channel available, an InsufficientResourcesException will be thrown,
+ * because of possible deadlocks. With more then one buffer per channel, deadlock-freedom is guaranteed.
+ *
+ * @param task task to be executed
+ * @throws InsufficientResourcesException thrown if not enough buffers available to execute the task
+ */
+ private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
+ Environment env = task.getEnvironment();
+
+ int numBuffers = this.globalBufferPool.numBuffers();
+ // existing channels + channels of the task
+ int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
+
+ // need at least one buffer per channel
+ if (numBuffers / numChannels < 1) {
+ String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
+ this.connectionInfo.hostname(), env.getTaskName(), numChannels - numBuffers);
+
+ throw new InsufficientResourcesException(msg);
+ }
+ }
+
+ /**
+ * Redistributes the buffers among the registered buffer pools. This method is called after each task registration
+ * and unregistration.
+ * <p>
+ * Every registered buffer pool gets buffers according to its number of channels weighted by the current buffer to
+ * channel ratio.
+ */
+ private void redistributeBuffers() {
+ if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) {
+ return;
+ }
+
+ int numBuffers = this.globalBufferPool.numBuffers();
+ int numChannels = this.channels.size();
+
+ double buffersPerChannel = numBuffers / (double) numChannels;
+
+ if (buffersPerChannel < 1.0) {
+ throw new RuntimeException("System has not enough buffers to execute tasks.");
+ }
+
+ // redistribute number of designated buffers per buffer pool
+ for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
+ int numDesignatedBuffers = (int) Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels());
+ bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // Envelope processing
+ // -----------------------------------------------------------------------------------------------------------------
+
+ private void releaseEnvelope(Envelope envelope) {
+ Buffer buffer = envelope.getBuffer();
+ if (buffer != null) {
+ buffer.recycleBuffer();
+ }
+ }
+
+ private void addReceiverListHint(ChannelID source, ChannelID localReceiver) {
+ EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver);
+
+ if (this.receiverCache.put(source, receiverList) != null) {
+ LOG.warn("Receiver cache already contained entry for " + source);
+ }
+ }
+
+ private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) {
+ EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver);
+
+ if (this.receiverCache.put(source, receiverList) != null) {
+ LOG.warn("Receiver cache already contained entry for " + source);
+ }
+ }
+
+ private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) {
+ Channel channel = this.channels.get(envelope.getSource());
+ if (channel == null) {
+ LOG.error("Cannot find channel for channel ID " + envelope.getSource());
+ return;
+ }
+
+ // Only generate sender hints for output channels
+ if (channel.isInputChannel()) {
+ return;
+ }
+
+ final ChannelID targetChannelID = channel.getConnectedId();
+ final int connectionIndex = receiver.getConnectionIndex();
+
+ final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
+ final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
+
+ this.networkConnectionManager.queueEnvelopeForTransfer(receiver, senderHint);
+ }
+
+ /**
+ * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
+ *
+ * @param jobID
+ * the ID of the job the given channel ID belongs to
+ * @param sourceChannelID
+ * the source channel ID for which the receiver list shall be retrieved
+ * @return the list of receivers or <code>null</code> if the receiver could not be determined
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
+ EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
+
+ if (receiverList != null) {
+ return receiverList;
+ }
+
+ while (true) {
+ ConnectionInfoLookupResponse lookupResponse;
+ synchronized (this.channelLookupService) {
+ lookupResponse = this.channelLookupService.lookupConnectionInfo(this.connectionInfo, jobID, sourceChannelID);
+ }
+
+ if (lookupResponse.receiverReady()) {
+ receiverList = new EnvelopeReceiverList(lookupResponse);
+ break;
+ }
+ else if (lookupResponse.receiverNotReady()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ if (reportException) {
+ throw new IOException("Lookup was interrupted.");
+ } else {
+ return null;
+ }
+ }
+ }
+ else if (lookupResponse.isJobAborting()) {
+ if (reportException) {
+ throw new CancelTaskException();
+ } else {
+ return null;
+ }
+ }
+ else if (lookupResponse.receiverNotFound()) {
+ if (reportException) {
+ throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
+ } else {
+ return null;
+ }
+ }
+ else {
+ throw new IllegalStateException("Unrecognized response to channel lookup.");
+ }
+ }
+
+ this.receiverCache.put(sourceChannelID, receiverList);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Receivers for source channel ID " + sourceChannelID + " at task manager " + this.connectionInfo +
+ ": " + receiverList);
+ }
+
+ return receiverList;
+ }
+
+ /**
+ * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
+ *
+ * @param channelIDs channel IDs for entries to invalidate
+ */
+ public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
+ for (ChannelID id : channelIDs) {
+ this.receiverCache.remove(id);
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // EnvelopeDispatcher methods
+ // -----------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
+ EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
+
+ Buffer srcBuffer = envelope.getBuffer();
+ Buffer destBuffer = null;
+
+ boolean success = false;
+
+ try {
+ if (receiverList.hasLocalReceiver()) {
+ ChannelID receiver = receiverList.getLocalReceiver();
+ Channel channel = this.channels.get(receiver);
+
+ if (channel == null) {
+ throw new LocalReceiverCancelledException(receiver);
+ }
+
+ if (!channel.isInputChannel()) {
+ throw new IOException("Local receiver " + receiver + " is not an input channel.");
+ }
+
+ InputChannel<?> inputChannel = (InputChannel<?>) channel;
+
+ // copy the buffer into the memory space of the receiver
+ if (srcBuffer != null) {
+ try {
+ destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size());
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ srcBuffer.copyToBuffer(destBuffer);
+ envelope.setBuffer(destBuffer);
+ srcBuffer.recycleBuffer();
+ }
+
+ inputChannel.queueEnvelope(envelope);
+ success = true;
+ }
+ else if (receiverList.hasRemoteReceiver()) {
+ RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
+
+ // Generate sender hint before sending the first envelope over the network
+ if (envelope.getSequenceNumber() == 0) {
+ generateSenderHint(envelope, remoteReceiver);
+ }
+
+ this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+ success = true;
+ }
+ } finally {
+ if (!success) {
+ if (srcBuffer != null) {
+ srcBuffer.recycleBuffer();
+ }
+ if (destBuffer != null) {
+ destBuffer.recycleBuffer();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
+ // this method sends only events back from input channels to output channels
+ // sanity check that we have no buffer
+ if (envelope.getBuffer() != null) {
+ throw new RuntimeException("Error: This method can only process envelopes without buffers.");
+ }
+
+ EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
+
+ if (receiverList.hasLocalReceiver()) {
+ ChannelID receiver = receiverList.getLocalReceiver();
+ Channel channel = this.channels.get(receiver);
+
+ if (channel == null) {
+ throw new LocalReceiverCancelledException(receiver);
+ }
+
+ if (channel.isInputChannel()) {
+ throw new IOException("Local receiver " + receiver + " of backward event is not an output channel.");
+ }
+
+ OutputChannel outputChannel = (OutputChannel) channel;
+ outputChannel.queueEnvelope(envelope);
+ }
+ else if (receiverList.hasRemoteReceiver()) {
+ RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
+
+ // Generate sender hint before sending the first envelope over the network
+ if (envelope.getSequenceNumber() == 0) {
+ generateSenderHint(envelope, remoteReceiver);
+ }
+
+ this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+ }
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
+ // ========================================================================================
+ // IMPORTANT
+ //
+ // This method is called by the network I/O thread that reads the incoming TCP
+ // connections. This method must have minimal overhead and not throw exception if
+ // something is wrong with a job or individual transmission, but only when something
+ // is fundamentally broken in the system.
+ // ========================================================================================
+
+ // the sender hint event is to let the receiver know where exactly the envelope came from.
+ // the receiver will cache the sender id and its connection info in its local lookup table
+ // that allows the receiver to send envelopes to the sender without first pinging the job manager
+ // for the sender's connection info
+
+ // Check if the envelope is the special envelope with the sender hint event
+ if (SenderHintEvent.isSenderHintEvent(envelope)) {
+ // Check if this is the final destination of the sender hint event before adding it
+ final SenderHintEvent seh = (SenderHintEvent) envelope.deserializeEvents().get(0);
+ if (this.channels.get(seh.getSource()) != null) {
+ addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
+ return;
+ }
+ }
+
+ // try and get the receiver list. if we cannot get it anymore, the task has been cleared
+ // the code frees the envelope on exception, so we need not to anything
+ EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, false);
+ if (receiverList == null) {
+ // receiver is cancelled and cleaned away
+ releaseEnvelope(envelope);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping envelope for cleaned up receiver.");
+ }
+
+ return;
+ }
+
+ if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
+ throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
+ }
+
+ ChannelID localReceiver = receiverList.getLocalReceiver();
+ Channel channel = this.channels.get(localReceiver);
+
+ // if the channel is null, it means that receiver has been cleared already (cancelled or failed).
+ // release the buffer immediately
+ if (channel == null) {
+ releaseEnvelope(envelope);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
+ }
+ }
+ else {
+ channel.queueEnvelope(envelope);
+ }
+ }
+
+ /**
+ *
+ * Upon an exception, this method frees the envelope.
+ *
+ * @param envelope
+ * @return
+ * @throws IOException
+ */
+ private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
+ try {
+ return getReceiverList(envelope.getJobID(), envelope.getSource(), reportException);
+ } catch (IOException e) {
+ releaseEnvelope(envelope);
+ throw e;
+ } catch (CancelTaskException e) {
+ releaseEnvelope(envelope);
+ throw e;
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // BufferProviderBroker methods
+ // -----------------------------------------------------------------------------------------------------------------
+
+ @Override
+ public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException {
+ EnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID, false);
+
+ // check if the receiver is already gone
+ if (receiverList == null) {
+ return this.discardingDataPool;
+ }
+
+ if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
+ throw new IOException("The destination to be looked up is not a single local endpoint.");
+ }
+
+
+ ChannelID localReceiver = receiverList.getLocalReceiver();
+ Channel channel = this.channels.get(localReceiver);
+
+ if (channel == null) {
+ // receiver is already canceled
+ return this.discardingDataPool;
+ }
+
+ if (!channel.isInputChannel()) {
+ throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
+ }
+
+ return (InputChannel<?>) channel;
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+
+ public void logBufferUtilization() {
+ System.out.println("Buffer utilization at " + System.currentTimeMillis());
+
+ System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
+
+ System.out.println("\tLocal buffer pool status:");
+
+ for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
+ bufferPool.logBufferUtilization();
+ }
+
+ this.networkConnectionManager.logBufferUtilization();
+
+ System.out.println("\tIncoming connections:");
+
+ for (Channel channel : this.channels.values()) {
+ if (channel.isInputChannel()) {
+ ((InputChannel<?>) channel).logQueuedEnvelopes();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
new file mode 100644
index 0000000..aeb5025
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ConnectionInfoLookupResponse.java
@@ -0,0 +1,143 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+public class ConnectionInfoLookupResponse implements IOReadableWritable {
+
+ private enum ReturnCode {
+ NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
+ };
+
+ // was request successful?
+ private ReturnCode returnCode;
+
+ private RemoteReceiver remoteTarget;
+
+ private ChannelID localTarget;
+
+
+ public ConnectionInfoLookupResponse() {}
+
+ public ConnectionInfoLookupResponse(ReturnCode code) {
+ this.returnCode = code;
+ this.remoteTarget = null;
+ this.localTarget = null;
+ }
+
+ public ConnectionInfoLookupResponse(ReturnCode code, ChannelID localTarget) {
+ this.returnCode = code;
+ this.remoteTarget = null;
+ this.localTarget = localTarget;
+ }
+
+ public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver receiver) {
+ this.returnCode = code;
+ this.remoteTarget = receiver;
+ this.localTarget = null;
+ }
+
+ public RemoteReceiver getRemoteTarget() {
+ return this.remoteTarget;
+ }
+
+ public ChannelID getLocalTarget() {
+ return this.localTarget;
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.returnCode = ReturnCode.values()[in.readInt()];
+
+ if (in.readBoolean()) {
+ this.remoteTarget = new RemoteReceiver();
+ this.remoteTarget.read(in);
+ }
+ if (in.readBoolean()) {
+ this.localTarget = new ChannelID();
+ this.localTarget.read(in);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.returnCode.ordinal());
+
+ if (this.remoteTarget != null) {
+ out.writeBoolean(true);
+ this.remoteTarget.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+
+ if (this.localTarget != null) {
+ out.writeBoolean(true);
+ this.localTarget.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ public boolean receiverNotFound() {
+ return (this.returnCode == ReturnCode.NOT_FOUND);
+ }
+
+ public boolean receiverNotReady() {
+ return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
+ }
+
+ public boolean receiverReady() {
+ return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
+ }
+
+ public boolean isJobAborting() {
+ return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
+ }
+
+
+ public static ConnectionInfoLookupResponse createReceiverFoundAndReady(ChannelID targetChannelID) {
+ return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, targetChannelID);
+ }
+
+ public static ConnectionInfoLookupResponse createReceiverFoundAndReady(RemoteReceiver remoteReceiver) {
+ return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, remoteReceiver);
+ }
+
+ public static ConnectionInfoLookupResponse createReceiverNotFound() {
+ return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND);
+ }
+
+ public static ConnectionInfoLookupResponse createReceiverNotReady() {
+ return new ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
+ }
+
+ public static ConnectionInfoLookupResponse createJobIsAborting() {
+ return new ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING);
+ }
+
+
+ @Override
+ public String toString() {
+ return this.returnCode.name() + ", local target: " + this.localTarget + ", remoteTarget: " + this.remoteTarget;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
new file mode 100644
index 0000000..0d2fcd4
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/InsufficientResourcesException.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+/**
+ * This exception is thrown by the {@link ChannelManager} to indicate that a task cannot be accepted because
+ * there are not enough resources available to safely execute it.
+ *
+ */
+public final class InsufficientResourcesException extends Exception {
+
+ /**
+ * The generated serial version UID.
+ */
+ private static final long serialVersionUID = -8977049569413215169L;
+
+ /**
+ * Constructs a new insufficient resources exception.
+ *
+ * @param msg
+ * the message describing the exception
+ */
+ InsufficientResourcesException(final String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
new file mode 100644
index 0000000..769ada5
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalReceiverCancelledException.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.nephele.execution.CancelTaskException;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+
+
+/**
+ *
+ */
+public class LocalReceiverCancelledException extends CancelTaskException {
+ private static final long serialVersionUID = 1L;
+
+ private final ChannelID receiver;
+
+ public LocalReceiverCancelledException(ChannelID receiver) {
+ this.receiver = receiver;
+ }
+
+
+ public ChannelID getReceiver() {
+ return receiver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
new file mode 100644
index 0000000..44ec642
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
@@ -0,0 +1,176 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.configuration.GlobalConfiguration;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.tcp.IncomingConnectionThread;
+import eu.stratosphere.runtime.io.network.tcp.OutgoingConnection;
+import eu.stratosphere.runtime.io.network.tcp.OutgoingConnectionThread;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * The network connection manager manages incoming and outgoing network connection from and to other hosts.
+ * <p>
+ * This class is thread-safe.
+ *
+ */
+public final class NetworkConnectionManager {
+
+ /**
+ * The default number of threads dealing with outgoing connections.
+ */
+ private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
+
+ /**
+ * The default number of connection retries before giving up.
+ */
+ private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
+
+ /**
+ * List of active threads dealing with outgoing connections.
+ */
+ private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
+
+ /**
+ * Thread dealing with incoming connections.
+ */
+ private final IncomingConnectionThread incomingConnectionThread;
+
+ /**
+ * Map containing currently active outgoing connections.
+ */
+ private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
+
+ /**
+ * The number of connection retries before giving up.
+ */
+ private final int numberOfConnectionRetries;
+
+ /**
+ * A buffer provider for read buffers
+ */
+ private final ChannelManager channelManager;
+
+ public NetworkConnectionManager(final ChannelManager channelManager,
+ final InetAddress bindAddress, final int dataPort) throws IOException {
+
+ final Configuration configuration = GlobalConfiguration.getConfiguration();
+
+ this.channelManager = channelManager;
+
+ // Start the connection threads
+ final int numberOfOutgoingConnectionThreads = configuration.getInteger(
+ "channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
+
+ for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
+ final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
+ outgoingConnectionThread.start();
+ this.outgoingConnectionThreads.add(outgoingConnectionThread);
+ }
+
+ this.incomingConnectionThread = new IncomingConnectionThread(
+ this.channelManager, true, new InetSocketAddress(bindAddress, dataPort));
+ this.incomingConnectionThread.start();
+
+ this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
+ DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
+ }
+
+ /**
+ * Randomly selects one of the active threads dealing with outgoing connections.
+ *
+ * @return one of the active threads dealing with outgoing connections
+ */
+ private OutgoingConnectionThread getOutgoingConnectionThread() {
+
+ return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
+ }
+
+ /**
+ * Queues an envelope for transfer to a particular target host.
+ *
+ * @param remoteReceiver
+ * the address of the remote receiver
+ * @param envelope
+ * the envelope to be transfered
+ */
+ public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final Envelope envelope) {
+
+ getOutgoingConnection(remoteReceiver).queueEnvelope(envelope);
+ }
+
+ /**
+ * Returns (and possibly creates) the outgoing connection for the given target address.
+ *
+ * @param targetAddress
+ * the address of the connection target
+ * @return the outgoing connection object
+ */
+ private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
+
+ OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
+
+ if (outgoingConnection == null) {
+
+ outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
+ this.numberOfConnectionRetries);
+
+ final OutgoingConnection oldEntry = this.outgoingConnections
+ .putIfAbsent(remoteReceiver, outgoingConnection);
+
+ // We had a race, use the old value
+ if (oldEntry != null) {
+ outgoingConnection = oldEntry;
+ }
+ }
+
+ return outgoingConnection;
+ }
+
+ public void shutDown() {
+
+ // Interrupt the threads we started
+ this.incomingConnectionThread.interrupt();
+
+ final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
+ while (it.hasNext()) {
+ it.next().interrupt();
+ }
+ }
+
+ public void logBufferUtilization() {
+
+ System.out.println("\tOutgoing connections:");
+
+ final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
+ .iterator();
+
+ while (it.hasNext()) {
+
+ final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
+ System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
new file mode 100644
index 0000000..da36ad0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/RemoteReceiver.java
@@ -0,0 +1,157 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.util.StringUtils;
+
+/**
+ * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
+ *
+ */
+public final class RemoteReceiver implements IOReadableWritable {
+
+ /**
+ * The address of the connection to the remote {@link TaskManager}.
+ */
+ private InetSocketAddress connectionAddress;
+
+ /**
+ * The index of the connection to the remote {@link TaskManager}.
+ */
+ private int connectionIndex;
+
+ /**
+ * Constructs a new remote receiver object.
+ *
+ * @param connectionAddress
+ * the address of the connection to the remote {@link TaskManager}
+ * @param connectionIndex
+ * the index of the connection to the remote {@link TaskManager}
+ */
+ public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
+
+ if (connectionAddress == null) {
+ throw new IllegalArgumentException("Argument connectionAddress must not be null");
+ }
+
+ if (connectionIndex < 0) {
+ throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
+ }
+
+ this.connectionAddress = connectionAddress;
+ this.connectionIndex = connectionIndex;
+ }
+
+ /**
+ * Default constructor for serialization/deserialization.
+ */
+ public RemoteReceiver() {
+ this.connectionAddress = null;
+ this.connectionIndex = -1;
+ }
+
+ /**
+ * Returns the address of the connection to the remote {@link TaskManager}.
+ *
+ * @return the address of the connection to the remote {@link TaskManager}
+ */
+ public InetSocketAddress getConnectionAddress() {
+
+ return this.connectionAddress;
+ }
+
+ /**
+ * Returns the index of the connection to the remote {@link TaskManager}.
+ *
+ * @return the index of the connection to the remote {@link TaskManager}
+ */
+ public int getConnectionIndex() {
+
+ return this.connectionIndex;
+ }
+
+
+ @Override
+ public int hashCode() {
+
+ return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
+ }
+
+
+ @Override
+ public boolean equals(final Object obj) {
+
+ if (!(obj instanceof RemoteReceiver)) {
+ return false;
+ }
+
+ final RemoteReceiver rr = (RemoteReceiver) obj;
+ if (!this.connectionAddress.equals(rr.connectionAddress)) {
+ return false;
+ }
+
+ if (this.connectionIndex != rr.connectionIndex) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+
+ final InetAddress ia = this.connectionAddress.getAddress();
+ out.writeInt(ia.getAddress().length);
+ out.write(ia.getAddress());
+ out.writeInt(this.connectionAddress.getPort());
+
+ out.writeInt(this.connectionIndex);
+ }
+
+
+ @Override
+ public void read(final DataInput in) throws IOException {
+
+ final int addr_length = in.readInt();
+ final byte[] address = new byte[addr_length];
+ in.readFully(address);
+
+ InetAddress ia = null;
+ try {
+ ia = InetAddress.getByAddress(address);
+ } catch (UnknownHostException uhe) {
+ throw new IOException(StringUtils.stringifyException(uhe));
+ }
+ final int port = in.readInt();
+ this.connectionAddress = new InetSocketAddress(ia, port);
+
+ this.connectionIndex = in.readInt();
+ }
+
+
+ @Override
+ public String toString() {
+
+ return this.connectionAddress + " (" + this.connectionIndex + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
new file mode 100644
index 0000000..32be058
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
@@ -0,0 +1,117 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+
+public final class SenderHintEvent extends AbstractEvent {
+
+ /**
+ * The sequence number that will be set for transfer envelopes which contain the sender hint event.
+ */
+ private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
+
+ private final ChannelID source;
+
+ private final RemoteReceiver remoteReceiver;
+
+ SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
+
+ if (source == null) {
+ throw new IllegalArgumentException("Argument source must not be null");
+ }
+
+ if (remoteReceiver == null) {
+ throw new IllegalArgumentException("Argument remoteReceiver must not be null");
+ }
+
+ this.source = source;
+ this.remoteReceiver = remoteReceiver;
+ }
+
+ public SenderHintEvent() {
+
+ this.source = new ChannelID();
+ this.remoteReceiver = new RemoteReceiver();
+ }
+
+ public ChannelID getSource() {
+
+ return this.source;
+ }
+
+ public RemoteReceiver getRemoteReceiver() {
+
+ return this.remoteReceiver;
+ }
+
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+
+ this.source.write(out);
+ this.remoteReceiver.write(out);
+ }
+
+
+ @Override
+ public void read(final DataInput in) throws IOException {
+
+ this.source.read(in);
+ this.remoteReceiver.read(in);
+ }
+
+ public static Envelope createEnvelopeWithEvent(final Envelope originalEnvelope,
+ final ChannelID source, final RemoteReceiver remoteReceiver) {
+
+ final Envelope envelope = new Envelope(SENDER_HINT_SEQUENCE_NUMBER,
+ originalEnvelope.getJobID(), originalEnvelope.getSource());
+
+ final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
+ envelope.serializeEventList(Arrays.asList(senderEvent));
+
+ return envelope;
+ }
+
+ static boolean isSenderHintEvent(final Envelope envelope) {
+
+ if (envelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
+ return false;
+ }
+
+ if (envelope.getBuffer() != null) {
+ return false;
+ }
+
+ List<? extends AbstractEvent> events = envelope.deserializeEvents();
+
+ if (events.size() != 1) {
+ return false;
+ }
+
+ if (!(events.get(0) instanceof SenderHintEvent)) {
+ return false;
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
new file mode 100644
index 0000000..1d23e93
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+/**
+ * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
+ * {@link eu.stratosphere.runtime.io.Buffer} has
+ * become available again.
+ *
+ */
+public interface BufferAvailabilityListener {
+
+ /**
+ * Indicates that at least one {@link eu.stratosphere.runtime.io.Buffer} has become available again.
+ */
+ void bufferAvailable();
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
new file mode 100644
index 0000000..e3085ee
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
@@ -0,0 +1,69 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.runtime.io.Buffer;
+
+import java.io.IOException;
+
+public interface BufferProvider {
+
+ /**
+ * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method returns immediately, even if the
+ * request could not be fulfilled.
+ *
+ * @param minBufferSize minimum size of the requested buffer (in bytes)
+ * @return buffer with at least the requested size or <code>null</code> if no such buffer is currently available
+ * @throws IOException
+ */
+ Buffer requestBuffer(int minBufferSize) throws IOException;
+
+ /**
+ * Requests a buffer with a minimum size of <code>minBufferSize</code>. The method blocks until the request has
+ * been fulfilled or {@link #reportAsynchronousEvent()} has been called.
+ *
+ * @param minBufferSize minimum size of the requested buffer (in bytes)
+ * @return buffer with at least the requested size
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException;
+
+ /**
+ * Returns the size of buffers (in bytes) available at this buffer provider.
+ *
+ * @return size of buffers (in bytes) available at this buffer provider
+ */
+ int getBufferSize();
+
+ /**
+ * Reports an asynchronous event and interrupts each blocking method of this buffer provider in order to allow the
+ * blocked thread to respond to the event.
+ */
+ void reportAsynchronousEvent();
+
+ /**
+ * Registers the given {@link BufferAvailabilityListener} with an empty buffer pool.
+ * <p>
+ * The registration only succeeds, if the buffer pool is empty and has not been destroyed yet.
+ * <p>
+ * The registered listener will receive a notification when at least one buffer has become available again. After
+ * the notification, the listener will be unregistered.
+ *
+ * @param listener the listener to be registered
+ * @return <code>true</code> if the registration has been successful; <code>false</code> if the registration
+ * failed, because the buffer pool was not empty or has already been destroyed
+ */
+ boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+}