You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:25 UTC
[28/34] Replace custom Java NIO TCP/IP code with Netty 4 library
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
index 405d79e..7b01f4e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -31,15 +31,13 @@ import eu.stratosphere.nephele.AbstractID;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.bufferprovider.DiscardBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.runtime.io.network.bufferprovider.SerialSingleBufferPool;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReceiverList;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -52,7 +50,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* The channel manager sets up the network buffers and dispatches data between channels.
*/
-public final class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
+public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
private static final Log LOG = LogFactory.getLog(ChannelManager.class);
@@ -68,20 +66,27 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
private final GlobalBufferPool globalBufferPool;
- private final NetworkConnectionManager networkConnectionManager;
+ private final NettyConnectionManager nettyConnectionManager;
private final InetSocketAddress ourAddress;
- private final SerialSingleBufferPool discardingDataPool;
+ private final DiscardBufferPool discardBufferPool;
// -----------------------------------------------------------------------------------------------------------------
public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
- int numNetworkBuffers, int networkBufferSize) throws IOException {
+ int numNetworkBuffers, int networkBufferSize,
+ int numInThreads, int numOutThreads,
+ int lowWatermark, int highWaterMark) throws IOException {
+
this.channelLookupService = channelLookupService;
this.connectionInfo = connectionInfo;
+
this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
- this.networkConnectionManager = new NetworkConnectionManager(this, connectionInfo.address(), connectionInfo.dataPort());
+
+ this.nettyConnectionManager = new NettyConnectionManager(
+ this, connectionInfo.address(), connectionInfo.dataPort(),
+ networkBufferSize, numInThreads, numOutThreads, lowWatermark, highWaterMark);
// management data structures
this.channels = new ConcurrentHashMap<ChannelID, Channel>();
@@ -91,11 +96,11 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
// a special pool if the data is to be discarded
- this.discardingDataPool = new SerialSingleBufferPool(networkBufferSize);
+ this.discardBufferPool = new DiscardBufferPool();
}
public void shutdown() {
- this.networkConnectionManager.shutDown();
+ this.nettyConnectionManager.shutdown();
this.globalBufferPool.destroy();
}
@@ -301,7 +306,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
}
}
- private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) {
+ private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException {
Channel channel = this.channels.get(envelope.getSource());
if (channel == null) {
LOG.error("Cannot find channel for channel ID " + envelope.getSource());
@@ -319,7 +324,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
- this.networkConnectionManager.queueEnvelopeForTransfer(receiver, senderHint);
+ this.nettyConnectionManager.enqueue(senderHint, receiver);
}
/**
@@ -331,7 +336,6 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
* the source channel ID for which the receiver list shall be retrieved
* @return the list of receivers or <code>null</code> if the receiver could not be determined
* @throws IOException
- * @throws InterruptedException
*/
private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
@@ -383,8 +387,10 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
this.receiverCache.put(sourceChannelID, receiverList);
if (LOG.isDebugEnabled()) {
- LOG.debug("Receivers for source channel ID " + sourceChannelID + " at task manager " + this.connectionInfo +
- ": " + receiverList);
+ LOG.debug(String.format("Receiver for %s: %s [%s])",
+ sourceChannelID,
+ receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(),
+ receiverList.hasLocalReceiver() ? "local" : "remote"));
}
return receiverList;
@@ -436,7 +442,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
} catch (InterruptedException e) {
throw new IOException(e.getMessage());
}
-
+
srcBuffer.copyToBuffer(destBuffer);
envelope.setBuffer(destBuffer);
srcBuffer.recycleBuffer();
@@ -453,7 +459,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
generateSenderHint(envelope, remoteReceiver);
}
- this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+ this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
success = true;
}
} finally {
@@ -501,7 +507,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
generateSenderHint(envelope, remoteReceiver);
}
- this.networkConnectionManager.queueEnvelopeForTransfer(remoteReceiver, envelope);
+ this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
}
}
@@ -597,7 +603,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
// check if the receiver is already gone
if (receiverList == null) {
- return this.discardingDataPool;
+ return this.discardBufferPool;
}
if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
@@ -610,7 +616,7 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
if (channel == null) {
// receiver is already canceled
- return this.discardingDataPool;
+ return this.discardBufferPool;
}
if (!channel.isInputChannel()) {
@@ -633,8 +639,6 @@ public final class ChannelManager implements EnvelopeDispatcher, BufferProviderB
bufferPool.logBufferUtilization();
}
- this.networkConnectionManager.logBufferUtilization();
-
System.out.println("\tIncoming connections:");
for (Channel channel : this.channels.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
new file mode 100644
index 0000000..0f8bae8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/Envelope.java
@@ -0,0 +1,178 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class Envelope {
+
+ private final JobID jobID;
+
+ private final ChannelID source;
+
+ private final int sequenceNumber;
+
+ private ByteBuffer serializedEventList;
+
+ private Buffer buffer;
+
+ public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
+ this.sequenceNumber = sequenceNumber;
+ this.jobID = jobID;
+ this.source = source;
+ }
+
+ private Envelope(Envelope toDuplicate) {
+ this.jobID = toDuplicate.jobID;
+ this.source = toDuplicate.source;
+ this.sequenceNumber = toDuplicate.sequenceNumber;
+ this.serializedEventList = null;
+ this.buffer = null;
+ }
+
+ public Envelope duplicate() {
+ Envelope duplicate = new Envelope(this);
+ if (hasBuffer()) {
+ duplicate.setBuffer(this.buffer.duplicate());
+ }
+
+ return duplicate;
+ }
+
+ public Envelope duplicateWithoutBuffer() {
+ return new Envelope(this);
+ }
+
+ public JobID getJobID() {
+ return this.jobID;
+ }
+
+ public ChannelID getSource() {
+ return this.source;
+ }
+
+ public int getSequenceNumber() {
+ return this.sequenceNumber;
+ }
+
+ public void setEventsSerialized(ByteBuffer serializedEventList) {
+ if (this.serializedEventList != null) {
+ throw new IllegalStateException("Event list has already been set.");
+ }
+
+ this.serializedEventList = serializedEventList;
+ }
+
+ public void serializeEventList(List<? extends AbstractEvent> eventList) {
+ if (this.serializedEventList != null) {
+ throw new IllegalStateException("Event list has already been set.");
+ }
+
+ this.serializedEventList = serializeEvents(eventList);
+ }
+
+ public ByteBuffer getEventsSerialized() {
+ return this.serializedEventList;
+ }
+
+ public List<? extends AbstractEvent> deserializeEvents() {
+ return deserializeEvents(getClass().getClassLoader());
+ }
+
+ public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
+ if (this.serializedEventList == null) {
+ return Collections.emptyList();
+ }
+
+ try {
+ DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
+
+ int numEvents = deserializer.readInt();
+ ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
+
+ for (int i = 0; i < numEvents; i++) {
+ String className = deserializer.readUTF();
+ Class<? extends AbstractEvent> clazz;
+ try {
+ clazz = Class.forName(className).asSubclass(AbstractEvent.class);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not load event class '" + className + "'.", e);
+ } catch (ClassCastException e) {
+ throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+ }
+
+ AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+ evt.read(deserializer);
+
+ events.add(evt);
+ }
+
+ return events;
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while deserializing the events.", e);
+ }
+ }
+
+ public void setBuffer(Buffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public Buffer getBuffer() {
+ return this.buffer;
+ }
+
+ private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
+ try {
+ // create the serialized event list
+ DataOutputSerializer serializer = events.size() == 0
+ ? new DataOutputSerializer(4)
+ : new DataOutputSerializer(events.size() * 32);
+ serializer.writeInt(events.size());
+
+ for (AbstractEvent evt : events) {
+ serializer.writeUTF(evt.getClass().getName());
+ evt.write(serializer);
+ }
+
+ return serializer.wrapAsByteBuffer();
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while serializing the task events.", e);
+ }
+ }
+
+ public boolean hasBuffer() {
+ return this.buffer != null;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Envelope %d [source id: %s, buffer size: %d, events size: %d]",
+ this.sequenceNumber, this.getSource(), this.buffer == null ? -1 : this.buffer.size(),
+ this.serializedEventList == null ? -1 : this.serializedEventList.remaining());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
new file mode 100644
index 0000000..372ff92
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeDispatcher.java
@@ -0,0 +1,46 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.IOException;
+
+/**
+ * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
+ */
+public interface EnvelopeDispatcher {
+
+ /**
+ * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
+ *
+ * @param envelope envelope to be sent
+ */
+ void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+ /**
+ * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
+ *
+ * @param envelope envelope to be sent
+ */
+ void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+ /**
+ * Dispatches an envelope from an incoming TCP connection.
+ * <p>
+ * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
+ * receiving input channel.
+ *
+ * @param envelope envelope to be sent
+ */
+ void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
new file mode 100644
index 0000000..d53d728
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/EnvelopeReceiverList.java
@@ -0,0 +1,75 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.net.InetAddress;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+
+/**
+ * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of
+ * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
+ * {@link InetAddress} objects and finally checkpoints which are identified by
+ * <p>
+ * This class is thread-safe.
+ *
+ */
+public class EnvelopeReceiverList {
+
+ private final ChannelID localReceiver;
+
+ private final RemoteReceiver remoteReceiver;
+
+ public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
+ this.localReceiver = cilr.getLocalTarget();
+ this.remoteReceiver = cilr.getRemoteTarget();
+ }
+
+ public EnvelopeReceiverList(ChannelID localReceiver) {
+ this.localReceiver = localReceiver;
+ this.remoteReceiver = null;
+ }
+
+ public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
+ this.localReceiver = null;
+ this.remoteReceiver = remoteReceiver;
+ }
+
+ public boolean hasLocalReceiver() {
+ return this.localReceiver != null;
+ }
+
+ public boolean hasRemoteReceiver() {
+ return this.remoteReceiver != null;
+ }
+
+ public int getTotalNumberOfReceivers() {
+ return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
+ }
+
+ public RemoteReceiver getRemoteReceiver() {
+ return this.remoteReceiver;
+ }
+
+ public ChannelID getLocalReceiver() {
+ return this.localReceiver;
+ }
+
+ @Override
+ public String toString() {
+ return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
deleted file mode 100644
index 44ec642..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.tcp.IncomingConnectionThread;
-import eu.stratosphere.runtime.io.network.tcp.OutgoingConnection;
-import eu.stratosphere.runtime.io.network.tcp.OutgoingConnectionThread;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * The network connection manager manages incoming and outgoing network connection from and to other hosts.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class NetworkConnectionManager {
-
- /**
- * The default number of threads dealing with outgoing connections.
- */
- private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
-
- /**
- * The default number of connection retries before giving up.
- */
- private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
-
- /**
- * List of active threads dealing with outgoing connections.
- */
- private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
-
- /**
- * Thread dealing with incoming connections.
- */
- private final IncomingConnectionThread incomingConnectionThread;
-
- /**
- * Map containing currently active outgoing connections.
- */
- private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
-
- /**
- * The number of connection retries before giving up.
- */
- private final int numberOfConnectionRetries;
-
- /**
- * A buffer provider for read buffers
- */
- private final ChannelManager channelManager;
-
- public NetworkConnectionManager(final ChannelManager channelManager,
- final InetAddress bindAddress, final int dataPort) throws IOException {
-
- final Configuration configuration = GlobalConfiguration.getConfiguration();
-
- this.channelManager = channelManager;
-
- // Start the connection threads
- final int numberOfOutgoingConnectionThreads = configuration.getInteger(
- "channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
-
- for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
- final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
- outgoingConnectionThread.start();
- this.outgoingConnectionThreads.add(outgoingConnectionThread);
- }
-
- this.incomingConnectionThread = new IncomingConnectionThread(
- this.channelManager, true, new InetSocketAddress(bindAddress, dataPort));
- this.incomingConnectionThread.start();
-
- this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
- DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
- }
-
- /**
- * Randomly selects one of the active threads dealing with outgoing connections.
- *
- * @return one of the active threads dealing with outgoing connections
- */
- private OutgoingConnectionThread getOutgoingConnectionThread() {
-
- return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
- }
-
- /**
- * Queues an envelope for transfer to a particular target host.
- *
- * @param remoteReceiver
- * the address of the remote receiver
- * @param envelope
- * the envelope to be transfered
- */
- public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final Envelope envelope) {
-
- getOutgoingConnection(remoteReceiver).queueEnvelope(envelope);
- }
-
- /**
- * Returns (and possibly creates) the outgoing connection for the given target address.
- *
- * @param targetAddress
- * the address of the connection target
- * @return the outgoing connection object
- */
- private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
-
- OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
-
- if (outgoingConnection == null) {
-
- outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
- this.numberOfConnectionRetries);
-
- final OutgoingConnection oldEntry = this.outgoingConnections
- .putIfAbsent(remoteReceiver, outgoingConnection);
-
- // We had a race, use the old value
- if (oldEntry != null) {
- outgoingConnection = oldEntry;
- }
- }
-
- return outgoingConnection;
- }
-
- public void shutDown() {
-
- // Interrupt the threads we started
- this.incomingConnectionThread.interrupt();
-
- final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
- while (it.hasNext()) {
- it.next().interrupt();
- }
- }
-
- public void logBufferUtilization() {
-
- System.out.println("\tOutgoing connections:");
-
- final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
- .iterator();
-
- while (it.hasNext()) {
-
- final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
- System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
index 32be058..65287f7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/SenderHintEvent.java
@@ -21,7 +21,6 @@ import java.util.List;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
public final class SenderHintEvent extends AbstractEvent {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
index 1d23e93..af08aa8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferAvailabilityListener.java
@@ -1,5 +1,5 @@
/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -13,16 +13,19 @@
package eu.stratosphere.runtime.io.network.bufferprovider;
+import eu.stratosphere.runtime.io.Buffer;
+
/**
- * This interface must be implemented to receive a notification from a {@link BufferProvider} when an empty
- * {@link eu.stratosphere.runtime.io.Buffer} has
- * become available again.
- *
+ * This interface must be implemented to receive an asynchronous callback from
+ * a {@link BufferProvider} as soon as a buffer has become available again.
*/
public interface BufferAvailabilityListener {
/**
- * Indicates that at least one {@link eu.stratosphere.runtime.io.Buffer} has become available again.
+ * Returns a Buffer to the listener.
+ * <p/>
+ * Note: the listener has to adjust the size of the returned Buffer to the
+ * requested size manually via {@link Buffer#limitSize(int)}.
*/
- void bufferAvailable();
+ void bufferAvailable(Buffer buffer) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
index e3085ee..d82b427 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProvider.java
@@ -65,5 +65,21 @@ public interface BufferProvider {
* @return <code>true</code> if the registration has been successful; <code>false</code> if the registration
* failed, because the buffer pool was not empty or has already been destroyed
*/
- boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+ BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener);
+
+ public enum BufferAvailabilityRegistration {
+ NOT_REGISTERED_BUFFER_AVAILABLE(false),
+ NOT_REGISTERED_BUFFER_POOL_DESTROYED(false),
+ REGISTERED(true);
+
+ private final boolean isSuccessful;
+
+ private BufferAvailabilityRegistration(boolean isSuccessful) {
+ this.isSuccessful = isSuccessful;
+ }
+
+ public boolean isSuccessful() {
+ return isSuccessful;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
new file mode 100644
index 0000000..5daa509
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/DiscardBufferPool.java
@@ -0,0 +1,51 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+public final class DiscardBufferPool implements BufferProvider, BufferRecycler {
+
+ @Override
+ public Buffer requestBuffer(int minBufferSize) {
+ return null;
+ }
+
+ @Override
+ public Buffer requestBufferBlocking(int minBufferSize) {
+ return null;
+ }
+
+ @Override
+ public int getBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public void reportAsynchronousEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
+ }
+
+ @Override
+ public void recycle(MemorySegment buffer) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
index e8aeb11..3eb10c1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
@@ -184,20 +184,20 @@ public final class LocalBufferPool implements BufferProvider {
}
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
synchronized (this.buffers) {
if (!this.buffers.isEmpty()) {
- return false;
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_AVAILABLE;
}
if (this.isDestroyed) {
- return false;
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
}
this.listeners.add(listener);
}
- return true;
+ return BufferAvailabilityRegistration.REGISTERED;
}
/**
@@ -221,7 +221,7 @@ public final class LocalBufferPool implements BufferProvider {
}
this.globalBufferPool.returnBuffer(this.buffers.poll());
- this.numRequestedBuffers --;
+ this.numRequestedBuffers--;
}
this.buffers.notify();
@@ -294,11 +294,17 @@ public final class LocalBufferPool implements BufferProvider {
this.globalBufferPool.returnBuffer(buffer);
this.numRequestedBuffers--;
} else {
- this.buffers.add(buffer);
- this.buffers.notify();
-
- while (!this.listeners.isEmpty()) {
- this.listeners.poll().bufferAvailable();
+ if (!this.listeners.isEmpty()) {
+ Buffer availableBuffer = new Buffer(buffer, buffer.size(), this.recycler);
+ try {
+ this.listeners.poll().bufferAvailable(availableBuffer);
+ } catch (Exception e) {
+ this.buffers.add(buffer);
+ this.buffers.notify();
+ }
+ } else {
+ this.buffers.add(buffer);
+ this.buffers.notify();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
deleted file mode 100644
index 217a5ce..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.bufferprovider;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.BufferRecycler;
-
-/**
- *
- */
-public final class SerialSingleBufferPool implements BufferProvider, BufferRecycler {
-
- private final Buffer buffer;
-
- /** Size of the buffer in this pool */
- private final int bufferSize;
-
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public SerialSingleBufferPool(int bufferSize) {
- this.buffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, this);
- this.bufferSize = bufferSize;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- @Override
- public Buffer requestBuffer(int minBufferSize) {
- if (minBufferSize <= this.bufferSize) {
- return this.buffer.duplicate();
- }
- else {
- throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
- }
- }
-
- @Override
- public Buffer requestBufferBlocking(int minBufferSize) {
- if (minBufferSize <= this.bufferSize) {
- return this.buffer.duplicate();
- }
- else {
- throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
- }
- }
-
- @Override
- public int getBufferSize() {
- return this.bufferSize;
- }
-
- @Override
- public void reportAsynchronousEvent() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void recycle(MemorySegment buffer) {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
deleted file mode 100644
index a692aec..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
-import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
-import eu.stratosphere.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public final class Envelope {
-
- private final JobID jobID;
-
- private final ChannelID source;
-
- private final int sequenceNumber;
-
- private ByteBuffer serializedEventList;
-
- private Buffer buffer;
-
- public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
- this.sequenceNumber = sequenceNumber;
- this.jobID = jobID;
- this.source = source;
- }
-
- private Envelope(Envelope toDuplicate) {
- this.jobID = toDuplicate.jobID;
- this.source = toDuplicate.source;
- this.sequenceNumber = toDuplicate.sequenceNumber;
- this.serializedEventList = null;
- this.buffer = null;
- }
-
- public Envelope duplicate() {
- Envelope duplicate = new Envelope(this);
- if (hasBuffer()) {
- duplicate.setBuffer(this.buffer.duplicate());
- }
-
- return duplicate;
- }
-
- public Envelope duplicateWithoutBuffer() {
- return new Envelope(this);
- }
-
- public JobID getJobID() {
- return this.jobID;
- }
-
- public ChannelID getSource() {
- return this.source;
- }
-
- public int getSequenceNumber() {
- return this.sequenceNumber;
- }
-
- public void setEventsSerialized(ByteBuffer serializedEventList) {
- if (this.serializedEventList != null)
- throw new IllegalStateException("Event list has already been set.");
-
- this.serializedEventList = serializedEventList;
- }
-
- public void serializeEventList(List<? extends AbstractEvent> eventList) {
- if (this.serializedEventList != null)
- throw new IllegalStateException("Event list has already been set.");
-
- this.serializedEventList = serializeEvents(eventList);
- }
-
- public ByteBuffer getEventsSerialized() {
- return this.serializedEventList;
- }
-
- public List<? extends AbstractEvent> deserializeEvents() {
- return deserializeEvents(getClass().getClassLoader());
- }
-
- public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
- if (this.serializedEventList == null) {
- return Collections.emptyList();
- }
-
- try {
- DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
-
- int numEvents = deserializer.readInt();
- ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
-
- for (int i = 0; i < numEvents; i++) {
- String className = deserializer.readUTF();
- Class<? extends AbstractEvent> clazz;
- try {
- clazz = Class.forName(className).asSubclass(AbstractEvent.class);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load event class '" + className + "'.", e);
- } catch (ClassCastException e) {
- throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
- }
-
- AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
- evt.read(deserializer);
-
- events.add(evt);
- }
-
- return events;
- }
- catch (IOException e) {
- throw new RuntimeException("Error while deserializing the events.", e);
- }
- }
-
- public void setBuffer(Buffer buffer) {
- this.buffer = buffer;
- }
-
- public Buffer getBuffer() {
- return this.buffer;
- }
-
- private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
- try {
- // create the serialized event list
- DataOutputSerializer serializer = events.size() == 0
- ? new DataOutputSerializer(4)
- : new DataOutputSerializer(events.size() * 32);
- serializer.writeInt(events.size());
-
- for (AbstractEvent evt : events) {
- serializer.writeUTF(evt.getClass().getName());
- evt.write(serializer);
- }
-
- return serializer.wrapAsByteBuffer();
- }
- catch (IOException e) {
- throw new RuntimeException("Error while serializing the task events.", e);
- }
- }
-
- public boolean hasBuffer() {
- return this.buffer != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
deleted file mode 100644
index 2b69c0d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import java.io.IOException;
-
-/**
- * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
- */
-public interface EnvelopeDispatcher {
-
- /**
- * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
-
- /**
- * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
-
- /**
- * Dispatches an envelope from an incoming TCP connection.
- * <p>
- * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
- * receiving input channel.
- *
- * @param envelope envelope to be sent
- */
- void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
deleted file mode 100644
index 7b7e178..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.ReadableByteChannel;
-
-public class EnvelopeReader {
-
- public enum DeserializationState {
- COMPLETE,
- PENDING,
- NO_BUFFER_AVAILABLE;
- }
-
- private final BufferProviderBroker bufferProviderBroker;
-
- private final ByteBuffer headerBuffer;
-
- private ByteBuffer currentHeaderBuffer;
-
- private ByteBuffer currentEventsList;
-
- private ByteBuffer currentDataBuffer;
-
- private int bufferRequestPendingWithSize;
-
-
- private Envelope pendingEnvelope;
-
- private Envelope constructedEnvelope;
-
-
- public BufferProvider bufferProvider;
-
- private JobID lastDeserializedJobID;
-
- private ChannelID lastDeserializedSourceID;
-
-
- public EnvelopeReader(BufferProviderBroker bufferProviderBroker) {
- this.bufferProviderBroker = bufferProviderBroker;
-
- this.headerBuffer = ByteBuffer.allocateDirect(EnvelopeWriter.HEADER_SIZE);
- this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
-
- this.currentHeaderBuffer = this.headerBuffer;
- }
-
- public DeserializationState readNextChunk(ReadableByteChannel channel) throws IOException {
-
- // 1) check if the header is pending
- if (this.currentHeaderBuffer != null) {
- ByteBuffer header = this.currentHeaderBuffer;
-
- channel.read(header);
- if (header.hasRemaining()) {
- // not finished with the header
- return DeserializationState.PENDING;
- } else {
- // header done, construct the envelope
- this.currentHeaderBuffer = null;
-
- Envelope env = constructEnvelopeFromHeader(header);
- this.pendingEnvelope = env;
-
- // check for events and data
- int eventsSize = getEventListSize(header);
- int bufferSize = getBufferSize(header);
-
- // make the events list the next buffer to be read
- if (eventsSize > 0) {
- this.currentEventsList = ByteBuffer.allocate(eventsSize);
- }
-
- // if we have a data buffer, we need memory segment for it
- // we may not immediately get the memory segment, though, so we first record
- // that we need it
- if (bufferSize > 0) {
- this.bufferRequestPendingWithSize = bufferSize;
- }
- }
- }
-
- // 2) read the eventList, if it should have one
- if (this.currentEventsList != null) {
- channel.read(this.currentEventsList);
- if (this.currentEventsList.hasRemaining()) {
- // events list still incomplete
- return DeserializationState.PENDING;
- } else {
- this.currentEventsList.flip();
- this.pendingEnvelope.setEventsSerialized(this.currentEventsList);
- this.currentEventsList = null;
- }
- }
-
- // 3) check if we need to get a buffer
- if (this.bufferRequestPendingWithSize > 0) {
- Buffer b = getBufferForTarget(this.pendingEnvelope.getJobID(), this.pendingEnvelope.getSource(), this.bufferRequestPendingWithSize);
- if (b == null) {
- // no buffer available at this time. come back later
- return DeserializationState.NO_BUFFER_AVAILABLE;
- } else {
- // buffer is available. set the field so the buffer will be filled
- this.pendingEnvelope.setBuffer(b);
- this.currentDataBuffer = b.getMemorySegment().wrap(0, this.bufferRequestPendingWithSize);
- this.bufferRequestPendingWithSize = 0;
- }
- }
-
- // 4) fill the buffer
- if (this.currentDataBuffer != null) {
- channel.read(this.currentDataBuffer);
- if (this.currentDataBuffer.hasRemaining()) {
- // data buffer incomplete
- return DeserializationState.PENDING;
- } else {
- this.currentDataBuffer = null;
- }
- }
-
- // if we get here, we completed our job, or did nothing, if the deserializer was not
- // reset after the previous envelope
- if (this.pendingEnvelope != null) {
- this.constructedEnvelope = this.pendingEnvelope;
- this.pendingEnvelope = null;
- return DeserializationState.COMPLETE;
- } else {
- throw new IllegalStateException("Error: read() was called before reserializer was reset after the last envelope.");
- }
- }
-
- private Envelope constructEnvelopeFromHeader(ByteBuffer header) throws IOException {
- int magicNumber = header.getInt(EnvelopeWriter.MAGIC_NUMBER_OFFSET);
-
- if (magicNumber != EnvelopeWriter.MAGIC_NUMBER) {
- throw new IOException("Network stream corrupted: invalid magic number in envelope header.");
- }
-
- int seqNum = header.getInt(EnvelopeWriter.SEQUENCE_NUMBER_OFFSET);
- JobID jid = JobID.fromByteBuffer(header, EnvelopeWriter.JOB_ID_OFFSET);
- ChannelID cid = ChannelID.fromByteBuffer(header, EnvelopeWriter.CHANNEL_ID_OFFSET);
- return new Envelope(seqNum, jid, cid);
- }
-
- private int getBufferSize(ByteBuffer header) {
- return header.getInt(EnvelopeWriter.BUFFER_SIZE_OFFSET);
- }
-
- private int getEventListSize(ByteBuffer header) {
- return header.getInt(EnvelopeWriter.EVENTS_SIZE_OFFSET);
- }
-
- private Buffer getBufferForTarget(JobID jid, ChannelID cid, int size) throws IOException {
- if (!(jid.equals(this.lastDeserializedJobID) && cid.equals(this.lastDeserializedSourceID))) {
- this.bufferProvider = this.bufferProviderBroker.getBufferProvider(jid, cid);
- this.lastDeserializedJobID = jid;
- this.lastDeserializedSourceID = cid;
- }
-
- return this.bufferProvider.requestBuffer(size);
- }
-
-
- public Envelope getFullyDeserializedTransferEnvelope() {
- Envelope t = this.constructedEnvelope;
- if (t == null) {
- throw new IllegalStateException("Envelope has not yet been fully constructed.");
- }
-
- this.constructedEnvelope = null;
- return t;
- }
-
- public void reset() {
- this.headerBuffer.clear();
- this.currentHeaderBuffer = this.headerBuffer;
- this.constructedEnvelope = null;
- }
-
- public boolean hasUnfinishedData() {
- return this.pendingEnvelope != null || this.currentHeaderBuffer != null;
- }
-
- public BufferProvider getBufferProvider() {
- return bufferProvider;
- }
-
- public Envelope getPendingEnvelope() {
- return pendingEnvelope;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
deleted file mode 100644
index f99e1f2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import java.net.InetAddress;
-
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
-import eu.stratosphere.runtime.io.network.RemoteReceiver;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three d ifferent types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- *
- */
-public class EnvelopeReceiverList {
-
- private final ChannelID localReceiver;
-
- private final RemoteReceiver remoteReceiver;
-
- public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
- this.localReceiver = cilr.getLocalTarget();
- this.remoteReceiver = cilr.getRemoteTarget();
- }
-
- public EnvelopeReceiverList(ChannelID localReceiver) {
- this.localReceiver = localReceiver;
- this.remoteReceiver = null;
- }
-
- public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
- this.localReceiver = null;
- this.remoteReceiver = remoteReceiver;
- }
-
- public boolean hasLocalReceiver() {
- return this.localReceiver != null;
- }
-
- public boolean hasRemoteReceiver() {
- return this.remoteReceiver != null;
- }
-
- public int getTotalNumberOfReceivers() {
- return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
- }
-
- public RemoteReceiver getRemoteReceiver() {
- return this.remoteReceiver;
- }
-
- public ChannelID getLocalReceiver() {
- return this.localReceiver;
- }
-
- @Override
- public String toString() {
- return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
deleted file mode 100644
index c00e61b..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.nephele.AbstractID;
-import eu.stratosphere.runtime.io.Buffer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
-
-public class EnvelopeWriter {
-
- protected static final int MAGIC_NUMBER = 0xBADC0FFE;
-
- /**
- * Size of the envelope header: 48 bytes = 4 bytes magic number, 4 bytes sequence number, 16 bytes job id,
- * 16 bytes sender id, 4 bytes bufferSize, 4 bytes event list length
- */
- public static final int HEADER_SIZE = 4 + 4 + 2 * AbstractID.SIZE + 4 + 4;
-
- public static final int MAGIC_NUMBER_OFFSET = 0;
-
- public static final int SEQUENCE_NUMBER_OFFSET = 4;
-
- public static final int JOB_ID_OFFSET = 8;
-
- public static final int CHANNEL_ID_OFFSET = 24;
-
- public static final int BUFFER_SIZE_OFFSET = 40;
-
- public static final int EVENTS_SIZE_OFFSET = 44;
-
- private ByteBuffer currentHeader;
-
- private ByteBuffer currentEvents;
-
- private ByteBuffer currentDataBuffer;
-
- private final ByteBuffer headerBuffer;
-
- public EnvelopeWriter() {
- this.headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
- this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
- }
-
- /**
- * @param channel
- * @return True, if the writer has more pending data for the current envelope, false if not.
- *
- * @throws java.io.IOException
- */
- public boolean writeNextChunk(WritableByteChannel channel) throws IOException {
- // 1) check if the the header is still pending
- if (this.currentHeader != null) {
- channel.write(this.currentHeader);
-
- if (this.currentHeader.hasRemaining()) {
- // header was not fully written, so we can leave this method
- return true;
- } else {
- this.currentHeader = null;
- }
- }
-
- // 2) check if there are events pending
- if (this.currentEvents != null) {
- channel.write(this.currentEvents);
- if (this.currentEvents.hasRemaining()) {
- // events were not fully written, so leave this method
- return true;
- } else {
- this.currentEvents = null;
- }
- }
-
- // 3) write the data buffer
- if (this.currentDataBuffer != null) {
- channel.write(this.currentDataBuffer);
- if (this.currentDataBuffer.hasRemaining()) {
- return true;
- } else {
- this.currentDataBuffer = null;
- }
- }
-
- return false;
- }
-
- public void setEnvelopeForWriting(Envelope env) {
- // header
- constructHeader(env);
- this.currentHeader = this.headerBuffer;
-
- // events (possibly null)
- this.currentEvents = env.getEventsSerialized();
-
- // data buffer (possibly null)
- Buffer buf = env.getBuffer();
- if (buf != null && buf.size() > 0) {
- this.currentDataBuffer = buf.getMemorySegment().wrap(0, buf.size());
- }
- }
-
- private void constructHeader(Envelope env) {
- final ByteBuffer buf = this.headerBuffer;
-
- buf.clear(); // reset
- buf.putInt(MAGIC_NUMBER);
- buf.putInt(env.getSequenceNumber()); // sequence number (4 bytes)
- env.getJobID().write(buf); // job Id (16 bytes)
- env.getSource().write(buf); // producerId (16 bytes)
-
- // buffer size
- buf.putInt(env.getBuffer() == null ? 0 : env.getBuffer().size());
-
- // size of event list
- buf.putInt(env.getEventsSerialized() == null ? 0 : env.getEventsSerialized().remaining());
-
- buf.flip();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
deleted file mode 100644
index f7d49bf..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-
-/**
- * This exception is thrown to indicate that the deserialization process of a {@link Envelope} could not be
- * continued because a {@link Buffer} to store the envelope's content is currently not available.
- *
- */
-public final class NoBufferAvailableException extends Exception {
-
- /**
- * Generated serial UID.
- */
- private static final long serialVersionUID = -9164212953646457026L;
-
- /**
- * The buffer provider which could not deliver a buffer.
- */
- private final BufferProvider bufferProvider;
-
- /**
- * Constructs a new exception.
- *
- * @param bufferProvider
- * the buffer provider which could not deliver a buffer
- */
- public NoBufferAvailableException(final BufferProvider bufferProvider) {
- this.bufferProvider = bufferProvider;
- }
-
- /**
- * Returns the buffer provider which could not deliver a buffer.
- *
- * @return the buffer provider which could not deliver a buffer
- */
- public BufferProvider getBufferProvider() {
- return this.bufferProvider;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
new file mode 100644
index 0000000..1ab1871
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -0,0 +1,344 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
+
+ private final BufferProviderBroker bufferProviderBroker;
+
+ private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask();
+
+ private final ConcurrentLinkedQueue<Buffer> bufferBroker = new ConcurrentLinkedQueue<Buffer>();
+
+ private final ByteBuffer headerBuffer;
+
+ private Envelope currentEnvelope;
+
+ private ByteBuffer currentEventsBuffer;
+
+ private ByteBuffer currentDataBuffer;
+
+ private int currentBufferRequestSize;
+
+ private BufferProvider currentBufferProvider;
+
+ private JobID lastJobId;
+
+ private ChannelID lastSourceId;
+
+ private ByteBuf stagedBuffer;
+
+ private ChannelHandlerContext channelHandlerContext;
+
+ private int bytesToSkip;
+
+ private enum DecoderState {
+ COMPLETE,
+ PENDING,
+ NO_BUFFER_AVAILABLE
+ }
+
+ public InboundEnvelopeDecoder(BufferProviderBroker bufferProviderBroker) {
+ this.bufferProviderBroker = bufferProviderBroker;
+ this.headerBuffer = ByteBuffer.allocateDirect(OutboundEnvelopeEncoder.HEADER_SIZE);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ if (this.channelHandlerContext == null) {
+ this.channelHandlerContext = ctx;
+ }
+
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (this.stagedBuffer != null) {
+ throw new IllegalStateException("No channel read event should be fired " +
+ "as long as the a buffer is staged.");
+ }
+
+ ByteBuf in = (ByteBuf) msg;
+
+ if (this.bytesToSkip > 0) {
+ this.bytesToSkip = skipBytes(in, this.bytesToSkip);
+
+ // we skipped over the whole buffer
+ if (this.bytesToSkip > 0) {
+ in.release();
+ return;
+ }
+ }
+
+ decodeBuffer(in, ctx);
+ }
+
+ /**
+ * Decodes all Envelopes contained in a Netty ByteBuf and forwards them in the pipeline.
+ * Returns true and releases the buffer, if it was fully consumed. Otherwise, returns false and retains the buffer.
+ * </p>
+ * In case of no buffer availability (returns false), a buffer availability listener is registered and the input
+ * buffer is staged for later consumption.
+ *
+ * @return <code>true</code>, if buffer fully consumed, <code>false</code> otherwise
+ * @throws IOException
+ */
+ private boolean decodeBuffer(ByteBuf in, ChannelHandlerContext ctx) throws IOException {
+
+ DecoderState decoderState;
+ while ((decoderState = decodeEnvelope(in)) != DecoderState.PENDING) {
+ if (decoderState == DecoderState.COMPLETE) {
+ ctx.fireChannelRead(this.currentEnvelope);
+ this.currentEnvelope = null;
+ }
+ else if (decoderState == DecoderState.NO_BUFFER_AVAILABLE) {
+ switch (this.currentBufferProvider.registerBufferAvailabilityListener(this)) {
+ case REGISTERED:
+ if (ctx.channel().config().isAutoRead()) {
+ ctx.channel().config().setAutoRead(false);
+ }
+
+ this.stagedBuffer = in;
+ this.stagedBuffer.retain();
+ return false;
+
+ case NOT_REGISTERED_BUFFER_AVAILABLE:
+ continue;
+
+ case NOT_REGISTERED_BUFFER_POOL_DESTROYED:
+ this.bytesToSkip = skipBytes(in, this.currentBufferRequestSize);
+
+ this.currentBufferRequestSize = 0;
+ this.currentEventsBuffer = null;
+ this.currentEnvelope = null;
+ }
+ }
+ }
+
+ if (in.isReadable()) {
+ throw new IllegalStateException("Every buffer should have been fully" +
+ "consumed after *successfully* decoding it (if it was not successful, " +
+ "the buffer will be staged for later consumption).");
+ }
+
+ in.release();
+ return true;
+ }
+
+ /**
+ * Notifies the IO thread that a Buffer has become available again.
+ * <p/>
+ * This method will be called from outside the Netty IO thread. The caller will be the buffer pool from which the
+ * available buffer comes (i.e. the InputGate).
+ * <p/>
+ * We have to make sure that the available buffer is handed over to the IO thread in a safe manner.
+ */
+ @Override
+ public void bufferAvailable(Buffer buffer) throws Exception {
+ this.bufferBroker.offer(buffer);
+ this.channelHandlerContext.channel().eventLoop().execute(this.bufferAvailabilityChangedTask);
+ }
+
+ /**
+ * Continues the decoding of a staged buffer after a buffer has become available again.
+ * <p/>
+ * This task should be executed by the IO thread to ensure safe access to the staged buffer.
+ */
+ private class BufferAvailabilityChangedTask implements Runnable {
+ @Override
+ public void run() {
+ Buffer availableBuffer = bufferBroker.poll();
+ if (availableBuffer == null) {
+ throw new IllegalStateException("The BufferAvailabilityChangedTask" +
+ "should only be executed when a Buffer has been offered" +
+ "to the Buffer broker (after becoming available).");
+ }
+
+ // This alters the state of the last `decodeEnvelope(ByteBuf)`
+ // call to set the buffer, which has become available again
+ availableBuffer.limitSize(currentBufferRequestSize);
+ currentEnvelope.setBuffer(availableBuffer);
+ currentDataBuffer = availableBuffer.getMemorySegment().wrap(0, InboundEnvelopeDecoder.this.currentBufferRequestSize);
+ currentBufferRequestSize = 0;
+
+ stagedBuffer.release();
+
+ try {
+ if (decodeBuffer(stagedBuffer, channelHandlerContext)) {
+ stagedBuffer = null;
+ channelHandlerContext.channel().config().setAutoRead(true);
+ }
+ } catch (IOException e) {
+ availableBuffer.recycleBuffer();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------
+
+ private DecoderState decodeEnvelope(ByteBuf in) throws IOException {
+ // --------------------------------------------------------------------
+ // (1) header (EnvelopeEncoder.HEADER_SIZE bytes)
+ // --------------------------------------------------------------------
+ if (this.currentEnvelope == null) {
+ copy(in, this.headerBuffer);
+
+ if (this.headerBuffer.hasRemaining()) {
+ return DecoderState.PENDING;
+ }
+ else {
+ this.headerBuffer.flip();
+
+ int magicNum = this.headerBuffer.getInt();
+ if (magicNum != OutboundEnvelopeEncoder.MAGIC_NUMBER) {
+ throw new IOException("Network stream corrupted: invalid magic" +
+ "number in current envelope header.");
+ }
+
+ int seqNum = this.headerBuffer.getInt();
+ JobID jobId = JobID.fromByteBuffer(this.headerBuffer);
+ ChannelID sourceId = ChannelID.fromByteBuffer(this.headerBuffer);
+
+ this.currentEnvelope = new Envelope(seqNum, jobId, sourceId);
+
+ int eventsSize = this.headerBuffer.getInt();
+ int bufferSize = this.headerBuffer.getInt();
+
+ this.currentEventsBuffer = eventsSize > 0 ? ByteBuffer.allocate(eventsSize) : null;
+ this.currentBufferRequestSize = bufferSize > 0 ? bufferSize : 0;
+
+ this.headerBuffer.clear();
+ }
+ }
+
+ // --------------------------------------------------------------------
+ // (2) events (var length)
+ // --------------------------------------------------------------------
+ if (this.currentEventsBuffer != null) {
+ copy(in, this.currentEventsBuffer);
+
+ if (this.currentEventsBuffer.hasRemaining()) {
+ return DecoderState.PENDING;
+ }
+ else {
+ this.currentEventsBuffer.flip();
+ this.currentEnvelope.setEventsSerialized(this.currentEventsBuffer);
+ this.currentEventsBuffer = null;
+ }
+ }
+
+ // --------------------------------------------------------------------
+ // (3) buffer (var length)
+ // --------------------------------------------------------------------
+ // (a) request a buffer from OUR pool
+ if (this.currentBufferRequestSize > 0) {
+ JobID jobId = this.currentEnvelope.getJobID();
+ ChannelID sourceId = this.currentEnvelope.getSource();
+ Buffer buffer = requestBufferForTarget(jobId, sourceId, this.currentBufferRequestSize);
+
+ if (buffer == null) {
+ return DecoderState.NO_BUFFER_AVAILABLE;
+ }
+ else {
+ this.currentEnvelope.setBuffer(buffer);
+ this.currentDataBuffer = buffer.getMemorySegment().wrap(0, this.currentBufferRequestSize);
+ this.currentBufferRequestSize = 0;
+ }
+ }
+
+ // (b) copy data to OUR buffer
+ if (this.currentDataBuffer != null) {
+ copy(in, this.currentDataBuffer);
+
+ if (this.currentDataBuffer.hasRemaining()) {
+ return DecoderState.PENDING;
+ }
+ else {
+ this.currentDataBuffer = null;
+ }
+ }
+
+ // if we made it to this point, we completed the envelope;
+ // in the other cases we return early with PENDING or NO_BUFFER_AVAILABLE
+ return DecoderState.COMPLETE;
+ }
+
+ private Buffer requestBufferForTarget(JobID jobId, ChannelID sourceId, int size) throws IOException {
+ // Request the buffer from the target buffer provider, which is the
+ // InputGate of the receiving InputChannel.
+ if (!(jobId.equals(this.lastJobId) && sourceId.equals(this.lastSourceId))) {
+ this.lastJobId = jobId;
+ this.lastSourceId = sourceId;
+
+ this.currentBufferProvider = this.bufferProviderBroker.getBufferProvider(jobId, sourceId);
+ }
+
+ return this.currentBufferProvider.requestBuffer(size);
+ }
+
+ /**
+ * Copies min(from.readableBytes(), to.remaining() bytes from Nettys ByteBuf to the Java NIO ByteBuffer.
+ */
+ private void copy(ByteBuf src, ByteBuffer dst) {
+ // This branch is necessary, because an Exception is thrown if the
+ // destination buffer has more remaining (writable) bytes than
+ // currently readable from the Netty ByteBuf source.
+ if (src.isReadable()) {
+ if (src.readableBytes() < dst.remaining()) {
+ int oldLimit = dst.limit();
+
+ dst.limit(dst.position() + src.readableBytes());
+ src.readBytes(dst);
+ dst.limit(oldLimit);
+ }
+ else {
+ src.readBytes(dst);
+ }
+ }
+ }
+
+ /**
+ * Skips over min(in.readableBytes(), toSkip) bytes in the Netty ByteBuf and returns how many bytes remain to be
+ * skipped.
+ *
+ * @return remaining bytes to be skipped
+ */
+ private int skipBytes(ByteBuf in, int toSkip) {
+ if (toSkip <= in.readableBytes()) {
+ in.readBytes(toSkip);
+ return 0;
+ }
+
+ int remainingToSkip = toSkip - in.readableBytes();
+ in.readerIndex(in.readerIndex() + in.readableBytes());
+
+ return remainingToSkip;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b01038dd/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
new file mode 100644
index 0000000..d0270b6
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
@@ -0,0 +1,41 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class InboundEnvelopeDispatcherHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Log LOG = LogFactory.getLog(InboundEnvelopeDispatcherHandler.class);
+
+ private final EnvelopeDispatcher channelManager;
+
+ public InboundEnvelopeDispatcherHandler(EnvelopeDispatcher channelManager) {
+ this.channelManager = channelManager;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ Envelope envelope = (Envelope) msg;
+// LOG.debug(String.format("Decoded envelope with seq num %d from source channel %s",
+// envelope.getSequenceNumber(),
+// envelope.getSource()));
+ this.channelManager.dispatchFromNetwork(envelope);
+ }
+}