You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:15 UTC
[18/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
deleted file mode 100644
index d0fa683..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.DefaultDeserializer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.NoBufferAvailableException;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This class represents an incoming data connection through which data streams are read and transformed into
- * {@link TransferEnvelope} objects. The source of the data stream is a TCP connection.
- *
- */
-public class IncomingConnection {
-
- /**
- * The log object used to report debug information and possible errors.
- */
- private static final Log LOG = LogFactory.getLog(IncomingConnection.class);
-
- /**
- * The readable byte channel through which the input data is retrieved.
- */
- private final ReadableByteChannel readableByteChannel;
-
- /**
- * The {@link DefaultDeserializer} used to transform the read bytes into transfer envelopes which can be
- * passed on to the respective channels.
- */
- private final DefaultDeserializer deserializer;
-
- /**
- * The byte buffered channel manager which handles and dispatches the received transfer envelopes.
- */
- private final ByteBufferedChannelManager byteBufferedChannelManager;
-
- public IncomingConnection(ByteBufferedChannelManager byteBufferedChannelManager,
- ReadableByteChannel readableByteChannel) {
- this.byteBufferedChannelManager = byteBufferedChannelManager;
- this.deserializer = new DefaultDeserializer(byteBufferedChannelManager);
- this.readableByteChannel = readableByteChannel;
- }
-
- public void reportTransmissionProblem(SelectionKey key, IOException ioe) {
-
- LOG.error(StringUtils.stringifyException(ioe));
-
- try {
- this.readableByteChannel.close();
- } catch (IOException e) {
- LOG.debug("An error occurred while closing the byte channel");
- }
-
- // Cancel key
- if (key != null) {
- key.cancel();
- }
-
- // Recycle read buffer
- if (this.deserializer.getBuffer() != null) {
- this.deserializer.getBuffer().recycleBuffer();
- }
-
- this.deserializer.reset();
- }
-
- public void read() throws IOException, InterruptedException, NoBufferAvailableException {
-
- this.deserializer.read(this.readableByteChannel);
-
- final TransferEnvelope transferEnvelope = this.deserializer.getFullyDeserializedTransferEnvelope();
- if (transferEnvelope != null) {
-
- final BufferProvider bufferProvider = this.deserializer.getBufferProvider();
- if (bufferProvider == null) {
- this.byteBufferedChannelManager.processEnvelopeFromNetwork(transferEnvelope, false);
- } else {
- this.byteBufferedChannelManager.processEnvelopeFromNetwork(transferEnvelope, bufferProvider.isShared());
- }
- }
- }
-
- public boolean isCloseUnexpected() {
-
- return this.deserializer.hasUnfinishedData();
- }
-
- public ReadableByteChannel getReadableByteChannel() {
- return this.readableByteChannel;
- }
-
- public void closeConnection(SelectionKey key) {
-
- try {
- this.readableByteChannel.close();
- } catch (IOException ioe) {
- LOG.error("On IOException occured while closing the socket: + " + StringUtils.stringifyException(ioe));
- }
-
- // Cancel key
- if (key != null) {
- key.cancel();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
deleted file mode 100644
index 05c3326..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-
-public class IncomingConnectionThread extends Thread {
-
- private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
-
- private final ByteBufferedChannelManager byteBufferedChannelManager;
-
- private final Selector selector;
-
- private final Queue<SelectionKey> pendingReadEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
- private final ServerSocketChannel listeningSocket;
-
- private static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
-
- private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
-
- private final SelectionKey key;
-
- private IncomingConnectionBufferAvailListener(final Queue<SelectionKey> pendingReadEventSubscribeRequests,
- final SelectionKey key) {
-
- this.pendingReadEventSubscribeRequests = pendingReadEventSubscribeRequests;
- this.key = key;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void bufferAvailable() {
-
- synchronized (this.pendingReadEventSubscribeRequests) {
- this.pendingReadEventSubscribeRequests.add(this.key);
- }
- }
- }
-
- public IncomingConnectionThread(ByteBufferedChannelManager byteBufferedChannelManager,
- boolean isListeningThread, InetSocketAddress listeningAddress) throws IOException {
- super("Incoming Connection Thread");
-
- this.selector = Selector.open();
- this.byteBufferedChannelManager = byteBufferedChannelManager;
-
- if (isListeningThread) {
- this.listeningSocket = ServerSocketChannel.open();
- this.listeningSocket.configureBlocking(false);
- listeningSocket.register(this.selector, SelectionKey.OP_ACCEPT);
- this.listeningSocket.socket().bind(listeningAddress);
- LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
- } else {
- this.listeningSocket = null;
- }
- }
-
- @Override
- public void run() {
-
- while (!this.isInterrupted()) {
-
- synchronized (this.pendingReadEventSubscribeRequests) {
- while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
- final SelectionKey key = this.pendingReadEventSubscribeRequests.poll();
- final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
- final SocketChannel socketChannel = (SocketChannel) key.channel();
-
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e) {
- incomingConnection.reportTransmissionProblem(key, e);
- }
- }
- }
-
- try {
- this.selector.select(500);
- } catch (IOException e) {
- LOG.error(e);
- }
-
- final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
- while (iter.hasNext()) {
- final SelectionKey key = iter.next();
-
- iter.remove();
- if (key.isValid()) {
- if (key.isReadable()) {
- doRead(key);
- } else if (key.isAcceptable()) {
- doAccept(key);
- } else {
- LOG.error("Unknown key: " + key);
- }
- } else {
- LOG.error("Received invalid key: " + key);
- }
- }
- }
-
- // Do cleanup, if necessary
- if (this.listeningSocket != null) {
- try {
- this.listeningSocket.close();
- } catch (IOException ioe) {
- // Actually, we can ignore this exception
- LOG.debug(ioe);
- }
- }
-
- // Finally, close the selector
- try {
- this.selector.close();
- } catch (IOException ioe) {
- LOG.debug(StringUtils.stringifyException(ioe));
- }
- }
-
- private void doAccept(SelectionKey key) {
-
- SocketChannel clientSocket = null;
-
- try {
- clientSocket = this.listeningSocket.accept();
- if (clientSocket == null) {
- LOG.error("Client socket is null");
- return;
- }
- } catch (IOException ioe) {
- LOG.error(ioe);
- return;
- }
-
- final IncomingConnection incomingConnection = new IncomingConnection(this.byteBufferedChannelManager,
- clientSocket);
- SelectionKey clientKey = null;
- try {
- clientSocket.configureBlocking(false);
- clientKey = clientSocket.register(this.selector, SelectionKey.OP_READ);
- clientKey.attach(incomingConnection);
- } catch (IOException ioe) {
- incomingConnection.reportTransmissionProblem(clientKey, ioe);
- }
- }
-
- private void doRead(SelectionKey key) {
-
- final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
- try {
- incomingConnection.read();
- } catch (EOFException eof) {
- if (incomingConnection.isCloseUnexpected()) {
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- LOG.error("Connection from " + socketChannel.socket().getRemoteSocketAddress()
- + " was closed unexpectedly");
- incomingConnection.reportTransmissionProblem(key, eof);
- } else {
- incomingConnection.closeConnection(key);
- }
- } catch (IOException ioe) {
- incomingConnection.reportTransmissionProblem(key, ioe);
- } catch (InterruptedException e) {
- // Nothing to do here
- } catch (NoBufferAvailableException e) {
- // There are no buffers available, unsubscribe from read event
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, 0);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e1) {
- incomingConnection.reportTransmissionProblem(key, e1);
- }
-
- final BufferAvailabilityListener bal = new IncomingConnectionBufferAvailListener(
- this.pendingReadEventSubscribeRequests, key);
- if (!e.getBufferProvider().registerBufferAvailabilityListener(bal)) {
- // In the meantime, a buffer has become available again, subscribe to read event again
-
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e1) {
- incomingConnection.reportTransmissionProblem(key, e1);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
deleted file mode 100644
index 739ec3d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-
-public interface InputChannelContext extends ChannelContext, BufferProvider {
-
- void logQueuedEnvelopes();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
deleted file mode 100644
index 0b05262..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-
-public interface InputGateContext extends GateContext {
-
- InputChannelContext createInputChannelContext(ChannelID channelID, InputChannelContext previousContext);
-
- LocalBufferPoolOwner getLocalBufferPoolOwner();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
deleted file mode 100644
index 87543b8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-/**
- * This exception is thrown by the {@link ByteBufferedChannelManager} to indicate that a task cannot be accepted because
- * there are not enough resources available to safely execute it.
- *
- */
-public final class InsufficientResourcesException extends Exception {
-
- /**
- * The generated serial version UID.
- */
- private static final long serialVersionUID = -8977049569413215169L;
-
- /**
- * Constructs a new insufficient resources exception.
- *
- * @param msg
- * the message describing the exception
- */
- InsufficientResourcesException(final String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
deleted file mode 100644
index 6a5cc47..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * The network connection manager manages incoming and outgoing network connection from and to other hosts.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class NetworkConnectionManager {
-
- /**
- * The default number of threads dealing with outgoing connections.
- */
- private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
-
- /**
- * The default number of connection retries before giving up.
- */
- private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
-
- /**
- * List of active threads dealing with outgoing connections.
- */
- private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
-
- /**
- * Thread dealing with incoming connections.
- */
- private final IncomingConnectionThread incomingConnectionThread;
-
- /**
- * Map containing currently active outgoing connections.
- */
- private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
-
- /**
- * The number of connection retries before giving up.
- */
- private final int numberOfConnectionRetries;
-
- /**
- * A buffer provider for read buffers
- */
- private final ByteBufferedChannelManager byteBufferedChannelManager;
-
- public NetworkConnectionManager(final ByteBufferedChannelManager byteBufferedChannelManager,
- final InetAddress bindAddress, final int dataPort) throws IOException {
-
- final Configuration configuration = GlobalConfiguration.getConfiguration();
-
- this.byteBufferedChannelManager = byteBufferedChannelManager;
-
- // Start the connection threads
- final int numberOfOutgoingConnectionThreads = configuration.getInteger(
- "channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
-
- for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
- final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
- outgoingConnectionThread.start();
- this.outgoingConnectionThreads.add(outgoingConnectionThread);
- }
-
- this.incomingConnectionThread = new IncomingConnectionThread(
- this.byteBufferedChannelManager, true, new InetSocketAddress(bindAddress, dataPort));
- this.incomingConnectionThread.start();
-
- this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
- DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
- }
-
- /**
- * Randomly selects one of the active threads dealing with outgoing connections.
- *
- * @return one of the active threads dealing with outgoing connections
- */
- private OutgoingConnectionThread getOutgoingConnectionThread() {
-
- return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
- }
-
- /**
- * Queues an envelope for transfer to a particular target host.
- *
- * @param remoteReceiver
- * the address of the remote receiver
- * @param transferEnvelope
- * the envelope to be transfered
- */
- public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final TransferEnvelope transferEnvelope) {
-
- getOutgoingConnection(remoteReceiver).queueEnvelope(transferEnvelope);
- }
-
- /**
- * Returns (and possibly creates) the outgoing connection for the given target address.
- *
- * @param targetAddress
- * the address of the connection target
- * @return the outgoing connection object
- */
- private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
-
- OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
-
- if (outgoingConnection == null) {
-
- outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
- this.numberOfConnectionRetries);
-
- final OutgoingConnection oldEntry = this.outgoingConnections
- .putIfAbsent(remoteReceiver, outgoingConnection);
-
- // We had a race, use the old value
- if (oldEntry != null) {
- outgoingConnection = oldEntry;
- }
- }
-
- return outgoingConnection;
- }
-
- public void shutDown() {
-
- // Interrupt the threads we started
- this.incomingConnectionThread.interrupt();
-
- final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
- while (it.hasNext()) {
- it.next().interrupt();
- }
- }
-
- public void logBufferUtilization() {
-
- System.out.println("\tOutgoing connections:");
-
- final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
- .iterator();
-
- while (it.hasNext()) {
-
- final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
- System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
deleted file mode 100644
index 94463b6..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.DefaultSerializer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * This class represents an outgoing TCP connection through which {@link TransferEnvelope} objects can be sent.
- * {@link TransferEnvelope} objects are received from the {@link ByteBufferedChannelManager} and added to a queue. An
- * additional network thread then takes the envelopes from the queue and transmits them to the respective destination
- * host.
- *
- */
-public class OutgoingConnection {
-
- /**
- * The log object used to report debug information and possible errors.
- */
- private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
-
- /**
- * The address this outgoing connection is connected to.
- */
- private final RemoteReceiver remoteReceiver;
-
- /**
- * The outgoing connection thread which actually transmits the queued transfer envelopes.
- */
- private final OutgoingConnectionThread connectionThread;
-
- /**
- * The queue of transfer envelopes to be transmitted.
- */
- private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque<TransferEnvelope>();
-
- /**
- * The {@link DefaultSerializer} object used to transform the envelopes into a byte stream.
- */
- private final DefaultSerializer serializer = new DefaultSerializer();
-
- /**
- * The {@link TransferEnvelope} that is currently processed.
- */
- private TransferEnvelope currentEnvelope = null;
-
- /**
- * Stores whether the underlying TCP connection is established. As this variable is accessed by the byte buffered
- * channel manager and the outgoing connection thread, it must be protected by a monitor.
- */
- private boolean isConnected = false;
-
- /**
- * Stores whether is underlying TCP connection is subscribed to the NIO write event. As this variable is accessed by
- * the byte buffered channel and the outgoing connection thread, it must be protected by a monitor.
- */
- private boolean isSubscribedToWriteEvent = false;
-
- /**
- * The overall number of connection retries which shall be performed before a connection error is reported.
- */
- private final int numberOfConnectionRetries;
-
- /**
- * The number of connection retries left before an I/O error is reported.
- */
- private int retriesLeft = 0;
-
- /**
- * The timestamp of the last connection retry.
- */
- private long timstampOfLastRetry = 0;
-
- /**
- * The current selection key representing the interest set of the underlying TCP NIO connection. This variable may
- * only be accessed the the outgoing connection thread.
- */
- private SelectionKey selectionKey = null;
-
- /**
- * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed.
- */
- private static long RETRYINTERVAL = 1000L; // 1 second
-
- /**
- * Constructs a new outgoing connection object.
- *
- * @param remoteReceiver
- * the address of the destination host this outgoing connection object is supposed to connect to
- * @param connectionThread
- * the connection thread which actually handles the network transfer
- * @param numberOfConnectionRetries
- * the number of connection retries allowed before an I/O error is reported
- */
- public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread connectionThread,
- int numberOfConnectionRetries) {
-
- this.remoteReceiver = remoteReceiver;
- this.connectionThread = connectionThread;
- this.numberOfConnectionRetries = numberOfConnectionRetries;
- }
-
- /**
- * Adds a new {@link TransferEnvelope} to the queue of envelopes to be transmitted to the destination host of this
- * connection.
- * <p>
- * This method should only be called by the {@link ByteBufferedChannelManager} object.
- *
- * @param transferEnvelope
- * the envelope to be added to the transfer queue
- */
- public void queueEnvelope(TransferEnvelope transferEnvelope) {
-
- synchronized (this.queuedEnvelopes) {
-
- checkConnection();
- this.queuedEnvelopes.add(transferEnvelope);
- }
- }
-
- private void checkConnection() {
-
- synchronized (this.queuedEnvelopes) {
-
- if (!this.isConnected) {
-
- this.retriesLeft = this.numberOfConnectionRetries;
- this.timstampOfLastRetry = System.currentTimeMillis();
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- } else {
-
- if (!this.isSubscribedToWriteEvent) {
- this.connectionThread.subscribeToWriteEvent(this.selectionKey);
- this.isSubscribedToWriteEvent = true;
- }
- }
-
- }
- }
-
- /**
- * Returns the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
- * connected to.
- * <p>
- * This method should be called by the {@link OutgoingConnectionThread} object only.
- *
- * @return the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
- * connected to
- */
- public InetSocketAddress getConnectionAddress() {
-
- return this.remoteReceiver.getConnectionAddress();
- }
-
- /**
- * Reports a problem which occurred while establishing the underlying TCP connection to this outgoing connection
- * object. Depending on the number of connection retries left, this method will either try to reestablish the TCP
- * connection or report an I/O error to all tasks which have queued envelopes for this connection. In the latter
- * case all queued envelopes will be dropped and all included buffers will be freed.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @param ioe
- * thrown if an error occurs while reseting the underlying TCP connection
- */
- public void reportConnectionProblem(IOException ioe) {
-
- // First, write exception to log
- final long currentTime = System.currentTimeMillis();
- if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
- LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
- }
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.selectionKey != null) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
- if (socketChannel != null) {
- try {
- socketChannel.close();
- } catch (IOException e) {
- LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
- }
- }
-
- this.selectionKey.cancel();
- this.selectionKey = null;
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- }
-
- if (hasRetriesLeft(currentTime)) {
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- return;
- }
-
- // Error is fatal
- LOG.error(ioe);
-
- // Notify source of current envelope and release buffer
- if (this.currentEnvelope != null) {
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- this.currentEnvelope = null;
- }
- }
-
- // Notify all other tasks which are waiting for data to be transmitted
- final Iterator<TransferEnvelope> iter = this.queuedEnvelopes.iterator();
- while (iter.hasNext()) {
- final TransferEnvelope envelope = iter.next();
- iter.remove();
- // Recycle the buffer inside the envelope
- if (envelope.getBuffer() != null) {
- envelope.getBuffer().recycleBuffer();
- }
- }
-
- this.queuedEnvelopes.clear();
- }
- }
-
- /**
- * Reports an I/O error which occurred while writing data to the TCP connection. As a result of the I/O error the
- * connection is closed and the interest keys are canceled. Moreover, the task which queued the currently
- * transmitted transfer envelope is notified about the error and the current envelope is dropped. If the current
- * envelope contains a buffer, the buffer is freed.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @param ioe
- * thrown if an error occurs while reseting the connection
- */
- public void reportTransmissionProblem(IOException ioe) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-
- // First, write exception to log
- if (this.currentEnvelope != null) {
- LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
- + socketChannel.socket().getRemoteSocketAddress()
- + " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
- } else {
- LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
- + socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
- }
-
- // Close the connection and cancel the interest key
- synchronized (this.queuedEnvelopes) {
- try {
- LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
- socketChannel.close();
- } catch (IOException e) {
- LOG.debug("An error occurred while responding to an IOException");
- LOG.debug(e);
- }
-
- this.selectionKey.cancel();
-
- // Error is fatal
- LOG.error(ioe);
-
- // Trigger new connection if there are more envelopes to be transmitted
- if (this.queuedEnvelopes.isEmpty()) {
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- } else {
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- }
-
- // We must assume the current envelope is corrupted so we notify the task which created it.
- if (this.currentEnvelope != null) {
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- this.currentEnvelope = null;
- }
- }
- }
- }
-
- /**
- * Checks whether further retries are left for establishing the underlying TCP connection.
- *
- * @param currentTime
- * the current system time in milliseconds since January 1st, 1970
- * @return <code>true</code> if there are retries left, <code>false</code> otherwise
- */
- private boolean hasRetriesLeft(long currentTime) {
-
- if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
- this.retriesLeft--;
- this.timstampOfLastRetry = currentTime;
- if (this.retriesLeft == 0) {
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Writes the content of the current {@link TransferEnvelope} object to the underlying TCP connection.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @return <code>true</code> if there is more data from this/other queued envelopes to be written to this channel
- * @throws IOException
- * thrown if an error occurs while writing the data to the channel
- */
- public boolean write() throws IOException {
-
- final WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
-
- if (this.currentEnvelope == null) {
- synchronized (this.queuedEnvelopes) {
- if (this.queuedEnvelopes.isEmpty()) {
- return false;
- } else {
- this.currentEnvelope = this.queuedEnvelopes.peek();
- this.serializer.setTransferEnvelope(this.currentEnvelope);
- }
- }
- }
-
- if (!this.serializer.write(writableByteChannel)) {
-
- // Make sure we recycle the attached memory or file buffers correctly
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- }
-
- synchronized (this.queuedEnvelopes) {
- this.queuedEnvelopes.poll();
- this.currentEnvelope = null;
- }
- }
-
- return true;
- }
-
- /**
- * Requests to close the underlying TCP connection. The request is ignored if at least one {@link TransferEnvelope}
- * is queued.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @throws IOException
- * thrown if an error occurs while closing the TCP connection
- */
- public void requestClose() throws IOException {
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.queuedEnvelopes.isEmpty()) {
-
- if (this.isSubscribedToWriteEvent) {
-
- this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
- this.isSubscribedToWriteEvent = false;
- }
- }
- }
- }
-
- /**
- * Closes the underlying TCP connection if no more {@link TransferEnvelope} objects are in the transmission queue.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @throws IOException
- */
- public void closeConnection() throws IOException {
-
- synchronized (this.queuedEnvelopes) {
-
- if (!this.queuedEnvelopes.isEmpty()) {
- return;
- }
-
- if (this.selectionKey != null) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
- socketChannel.close();
- this.selectionKey.cancel();
- this.selectionKey = null;
- }
-
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- }
- }
-
- /**
- * Returns the number of queued {@link TransferEnvelope} objects with the given source channel ID.
- *
- * @param sourceChannelID
- * the source channel ID to count the queued envelopes for
- * @return the number of queued transfer envelopes with the given source channel ID
- */
- public int getNumberOfQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
- synchronized (this.queuedEnvelopes) {
-
- int number = 0;
-
- final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
- final TransferEnvelope te = it.next();
- if (sourceChannelID.equals(te.getSource())) {
- number++;
- }
- }
-
- return number;
- }
- }
-
- /**
- * Removes all queued {@link TransferEnvelope} objects from the transmission which match the given source channel
- * ID.
- *
- * @param sourceChannelID
- * the source channel ID of the transfered transfer envelopes to be dropped
- */
- public void dropAllQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
- synchronized (this.queuedEnvelopes) {
-
- final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
- final TransferEnvelope te = it.next();
- if (sourceChannelID.equals(te.getSource())) {
- it.remove();
- if (te.getBuffer() != null) {
- te.getBuffer().recycleBuffer();
- }
- }
- }
- }
- }
-
- /**
- * Checks whether this outgoing connection object manages an active connection or can be removed by the
- * {@link ByteBufferedChannelManager} object.
- * <p>
- * This method should only be called by the byte buffered channel manager.
- *
- * @return <code>true</code> if this object is no longer manages an active connection and can be removed,
- * <code>false</code> otherwise.
- */
- public boolean canBeRemoved() {
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.isConnected) {
- return false;
- }
-
- if (this.currentEnvelope != null) {
- return false;
- }
-
- return this.queuedEnvelopes.isEmpty();
- }
- }
-
- /**
- * Sets the selection key representing the interest set of the underlying TCP NIO connection.
- *
- * @param selectionKey
- * the selection of the underlying TCP connection
- */
- public void setSelectionKey(SelectionKey selectionKey) {
- this.selectionKey = selectionKey;
- }
-
- /**
- * Returns the number of currently queued envelopes which contain a write buffer.
- *
- * @return the number of currently queued envelopes which contain a write buffer
- */
- public int getNumberOfQueuedWriteBuffers() {
-
- int retVal = 0;
-
- synchronized (this.queuedEnvelopes) {
-
- final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
-
- final TransferEnvelope envelope = it.next();
- if (envelope.getBuffer() != null) {
- ++retVal;
- }
- }
- }
-
- return retVal;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
deleted file mode 100644
index ef03a9c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.util.StringUtils;
-
-public class OutgoingConnectionThread extends Thread {
-
- /**
- * The minimum time a TCP connection must be idle it is closed.
- */
- private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000L; // 80 seconds
-
- private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
-
- private final Selector selector;
-
- private final Queue<OutgoingConnection> pendingConnectionRequests = new ArrayDeque<OutgoingConnection>();
-
- private final Queue<SelectionKey> pendingWriteEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
- private final Map<OutgoingConnection, Long> connectionsToClose = new HashMap<OutgoingConnection, Long>();
-
- public OutgoingConnectionThread() throws IOException {
- super("Outgoing Connection Thread");
-
- this.selector = Selector.open();
- }
-
-
- @Override
- public void run() {
-
- while (!isInterrupted()) {
-
- synchronized (this.pendingConnectionRequests) {
-
- if (!this.pendingConnectionRequests.isEmpty()) {
-
- final OutgoingConnection outgoingConnection = this.pendingConnectionRequests.poll();
- try {
- final SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
- socketChannel.connect(outgoingConnection.getConnectionAddress());
- key.attach(outgoingConnection);
- } catch (final IOException ioe) {
- // IOException is reported by separate thread to avoid deadlocks
- final Runnable reporterThread = new Runnable() {
-
- @Override
- public void run() {
- outgoingConnection.reportConnectionProblem(ioe);
- }
- };
- new Thread(reporterThread).start();
- }
- }
- }
-
- synchronized (this.pendingWriteEventSubscribeRequests) {
-
- if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
- final SelectionKey oldSelectionKey = this.pendingWriteEventSubscribeRequests.poll();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) oldSelectionKey.attachment();
- final SocketChannel socketChannel = (SocketChannel) oldSelectionKey.channel();
-
- try {
- final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ
- | SelectionKey.OP_WRITE);
- newSelectionKey.attach(outgoingConnection);
- outgoingConnection.setSelectionKey(newSelectionKey);
- } catch (final IOException ioe) {
- // IOException is reported by separate thread to avoid deadlocks
- final Runnable reporterThread = new Runnable() {
-
- @Override
- public void run() {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- };
- new Thread(reporterThread).start();
- }
- }
- }
-
- synchronized (this.connectionsToClose) {
-
- final Iterator<Map.Entry<OutgoingConnection, Long>> closeIt = this.connectionsToClose.entrySet()
- .iterator();
- final long now = System.currentTimeMillis();
- while (closeIt.hasNext()) {
-
- final Map.Entry<OutgoingConnection, Long> entry = closeIt.next();
- if ((entry.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE) < now) {
- final OutgoingConnection outgoingConnection = entry.getKey();
- closeIt.remove();
- // Create new thread to close connection to avoid deadlocks
- final Runnable closeThread = new Runnable() {
-
- @Override
- public void run() {
- try {
- outgoingConnection.closeConnection();
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
- };
-
- new Thread(closeThread).start();
- }
-
- }
- }
-
- try {
- this.selector.select(10);
- } catch (IOException e) {
- LOG.error(e);
- }
-
- final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
- while (iter.hasNext()) {
- final SelectionKey key = iter.next();
-
- iter.remove();
- if (key.isValid()) {
- if (key.isConnectable()) {
- doConnect(key);
- } else {
- if (key.isReadable()) {
- doRead(key);
- // A read will always result in an exception, so the write key will not be valid anymore
- continue;
- }
- if (key.isWritable()) {
- doWrite(key);
- }
- }
- } else {
- LOG.error("Received invalid key: " + key);
- }
- }
- }
-
- // Finally, try to close the selector
- try {
- this.selector.close();
- } catch (IOException ioe) {
- LOG.debug(StringUtils.stringifyException(ioe));
- }
- }
-
- private void doConnect(SelectionKey key) {
-
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- try {
- while (!socketChannel.finishConnect()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e1) {
- LOG.error(e1);
- }
- }
-
- final SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_WRITE
- | SelectionKey.OP_READ);
- outgoingConnection.setSelectionKey(channelKey);
- channelKey.attach(outgoingConnection);
-
- } catch (IOException ioe) {
- outgoingConnection.reportConnectionProblem(ioe);
- }
- }
-
- private void doWrite(SelectionKey key) {
-
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-
- try {
-
- if (!outgoingConnection.write()) {
- // Try to close the connection
- outgoingConnection.requestClose();
- }
-
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
-
- private void doRead(SelectionKey key) {
-
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
- final ByteBuffer buf = ByteBuffer.allocate(8);
-
- try {
-
- if (socketChannel.read(buf) == -1) {
- outgoingConnection.reportTransmissionProblem(new IOException(
- "Read unexpected EOF from channel"));
- } else {
- LOG.error("Outgoing connection read real data from channel");
- }
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
-
- public void triggerConnect(OutgoingConnection outgoingConnection) {
-
- synchronized (this.pendingConnectionRequests) {
- this.pendingConnectionRequests.add(outgoingConnection);
- }
- }
-
- public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
-
- final SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
-
- final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newSelectionKey.attach(outgoingConnection);
- outgoingConnection.setSelectionKey(newSelectionKey);
-
- synchronized (this.connectionsToClose) {
- this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
- }
- }
-
- public void subscribeToWriteEvent(SelectionKey selectionKey) {
-
- synchronized (this.pendingWriteEventSubscribeRequests) {
- this.pendingWriteEventSubscribeRequests.add(selectionKey);
- }
- synchronized (this.connectionsToClose) {
- this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
deleted file mode 100644
index 7d8a571..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-public interface OutputChannelContext extends ChannelContext {
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
deleted file mode 100644
index d9a16b7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class OutputChannelForwardingChain {
-
- private final Queue<AbstractEvent> incomingEventQueue = new LinkedBlockingDeque<AbstractEvent>();
-
- private final AbstractOutputChannelForwarder first;
-
- private final AbstractOutputChannelForwarder last;
-
- public OutputChannelForwardingChain(final AbstractOutputChannelForwarder first,
- final AbstractOutputChannelForwarder last) {
-
- if (first == null) {
- throw new IllegalArgumentException("Argument first must not be null");
- }
-
- if (last == null) {
- throw new IllegalArgumentException("Argument last must not be null");
- }
-
- this.first = first;
- this.last = last;
- }
-
- public void pushEnvelope(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
- this.first.push(transferEnvelope);
- }
-
- public TransferEnvelope pullEnvelope() {
-
- return this.last.pull();
- }
-
- public void processEvent(final AbstractEvent event) {
-
- this.first.processEvent(event);
- }
-
- public boolean anyForwarderHasDataLeft() throws IOException, InterruptedException {
-
- return this.first.hasDataLeft();
- }
-
- public void destroy() {
-
- this.first.destroy();
- }
-
- public void processQueuedEvents() {
-
- AbstractEvent event = this.incomingEventQueue.poll();
- while (event != null) {
-
- this.first.processEvent(event);
- event = this.incomingEventQueue.poll();
- }
- }
-
- void offerEvent(final AbstractEvent event) {
- this.incomingEventQueue.offer(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
deleted file mode 100644
index fb2022a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-public interface OutputGateContext extends GateContext {
-
- OutputChannelContext createOutputChannelContext(ChannelID channelID, OutputChannelContext previousContext,
- boolean isReceiverRunning, boolean mergeSpillBuffers);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
deleted file mode 100644
index 0b8f351..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * An unknown receiver event can be used by the framework to inform a sender task that the delivery of a
- * {@link TransferEnvelope} has failed since the receiver could not be found.
- *
- */
-public final class ReceiverNotFoundEvent extends AbstractEvent {
-
- /**
- * The sequence number that will be set for transfer envelopes which contain the receiver not found event.
- */
- private static final int RECEIVER_NOT_FOUND_SEQUENCE_NUMBER = 0;
-
- /**
- * The ID of the receiver which could not be found
- */
- private ChannelID receiverID;
-
- /**
- * The sequence number of the envelope this event refers to
- */
- private int sequenceNumber;
-
- /**
- * Constructs a new unknown receiver event.
- *
- * @param receiverID
- * the ID of the receiver which could not be found
- * @param sequenceNumber
- * the sequence number of the envelope this event refers to
- */
- public ReceiverNotFoundEvent(final ChannelID receiverID, final int sequenceNumber) {
-
- if (receiverID == null) {
- throw new IllegalArgumentException("Argument unknownReceiverID must not be null");
- }
-
- if (sequenceNumber < 0) {
- throw new IllegalArgumentException("Argument sequenceNumber must be non-negative");
- }
-
- this.receiverID = receiverID;
- this.sequenceNumber = sequenceNumber;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public ReceiverNotFoundEvent() {
-
- this.receiverID = new ChannelID();
- }
-
- /**
- * Returns the ID of the receiver which could not be found.
- *
- * @return the ID of the receiver which could not be found
- */
- public ChannelID getReceiverID() {
-
- return this.receiverID;
- }
-
- /**
- * Returns the sequence number of the envelope this event refers to.
- *
- * @return the sequence number of the envelope this event refers to
- */
- public int getSequenceNumber() {
-
- return this.sequenceNumber;
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- this.receiverID.write(out);
- out.writeInt(this.sequenceNumber);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- this.receiverID.read(in);
- this.sequenceNumber = in.readInt();
- }
-
- /**
- * Creates a transfer envelope which only contains a ReceiverNotFoundEvent.
- *
- * @param jobID
- * the ID of the job the event relates to.
- * @param receiver
- * the channel ID of the receiver that could not be found
- * @param sequenceNumber
- * the sequence number of the transfer envelope which caused the creation of this event
- * @return a transfer envelope which only contains a ReceiverNotFoundEvent
- */
- public static TransferEnvelope createEnvelopeWithEvent(final JobID jobID, final ChannelID receiver,
- final int sequenceNumber) {
-
- final TransferEnvelope transferEnvelope = new TransferEnvelope(RECEIVER_NOT_FOUND_SEQUENCE_NUMBER, jobID,
- receiver);
-
- final ReceiverNotFoundEvent unknownReceiverEvent = new ReceiverNotFoundEvent(receiver, sequenceNumber);
- transferEnvelope.addEvent(unknownReceiverEvent);
-
- return transferEnvelope;
- }
-
- /**
- * Checks if the given envelope only contains a ReceiverNotFoundEvent.
- *
- * @param transferEnvelope
- * the envelope to be checked
- * @return <code>true</code> if the envelope only contains a ReceiverNotFoundEvent, <code>false</code> otherwise
- */
- public static boolean isReceiverNotFoundEvent(final TransferEnvelope transferEnvelope) {
-
- if (transferEnvelope.getSequenceNumber() != RECEIVER_NOT_FOUND_SEQUENCE_NUMBER) {
- return false;
- }
-
- if (transferEnvelope.getBuffer() != null) {
- return false;
- }
-
- final EventList eventList = transferEnvelope.getEventList();
- if (eventList == null) {
- return false;
- }
-
- if (eventList.size() != 1) {
- return false;
- }
-
- if (!(eventList.get(0) instanceof ReceiverNotFoundEvent)) {
- return false;
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
deleted file mode 100644
index db4e412..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
- *
- */
-public final class RemoteReceiver implements IOReadableWritable {
-
- /**
- * The address of the connection to the remote {@link TaskManager}.
- */
- private InetSocketAddress connectionAddress;
-
- /**
- * The index of the connection to the remote {@link TaskManager}.
- */
- private int connectionIndex;
-
- /**
- * Constructs a new remote receiver object.
- *
- * @param connectionAddress
- * the address of the connection to the remote {@link TaskManager}
- * @param connectionIndex
- * the index of the connection to the remote {@link TaskManager}
- */
- public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
-
- if (connectionAddress == null) {
- throw new IllegalArgumentException("Argument connectionAddress must not be null");
- }
-
- if (connectionIndex < 0) {
- throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
- }
-
- this.connectionAddress = connectionAddress;
- this.connectionIndex = connectionIndex;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public RemoteReceiver() {
- this.connectionAddress = null;
- this.connectionIndex = -1;
- }
-
- /**
- * Returns the address of the connection to the remote {@link TaskManager}.
- *
- * @return the address of the connection to the remote {@link TaskManager}
- */
- public InetSocketAddress getConnectionAddress() {
-
- return this.connectionAddress;
- }
-
- /**
- * Returns the index of the connection to the remote {@link TaskManager}.
- *
- * @return the index of the connection to the remote {@link TaskManager}
- */
- public int getConnectionIndex() {
-
- return this.connectionIndex;
- }
-
-
- @Override
- public int hashCode() {
-
- return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!(obj instanceof RemoteReceiver)) {
- return false;
- }
-
- final RemoteReceiver rr = (RemoteReceiver) obj;
- if (!this.connectionAddress.equals(rr.connectionAddress)) {
- return false;
- }
-
- if (this.connectionIndex != rr.connectionIndex) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- final InetAddress ia = this.connectionAddress.getAddress();
- out.writeInt(ia.getAddress().length);
- out.write(ia.getAddress());
- out.writeInt(this.connectionAddress.getPort());
-
- out.writeInt(this.connectionIndex);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- final int addr_length = in.readInt();
- final byte[] address = new byte[addr_length];
- in.readFully(address);
-
- InetAddress ia = null;
- try {
- ia = InetAddress.getByAddress(address);
- } catch (UnknownHostException uhe) {
- throw new IOException(StringUtils.stringifyException(uhe));
- }
- final int port = in.readInt();
- this.connectionAddress = new InetSocketAddress(ia, port);
-
- this.connectionIndex = in.readInt();
- }
-
-
- @Override
- public String toString() {
-
- return this.connectionAddress + " (" + this.connectionIndex + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
deleted file mode 100644
index a6aebb1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class SenderHintEvent extends AbstractEvent {
-
- /**
- * The sequence number that will be set for transfer envelopes which contain the sender hint event.
- */
- private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
-
- private final ChannelID source;
-
- private final RemoteReceiver remoteReceiver;
-
- SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
- if (source == null) {
- throw new IllegalArgumentException("Argument source must not be null");
- }
-
- if (remoteReceiver == null) {
- throw new IllegalArgumentException("Argument remoteReceiver must not be null");
- }
-
- this.source = source;
- this.remoteReceiver = remoteReceiver;
- }
-
- public SenderHintEvent() {
-
- this.source = new ChannelID();
- this.remoteReceiver = new RemoteReceiver();
- }
-
- public ChannelID getSource() {
-
- return this.source;
- }
-
- public RemoteReceiver getRemoteReceiver() {
-
- return this.remoteReceiver;
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- this.source.write(out);
- this.remoteReceiver.write(out);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- this.source.read(in);
- this.remoteReceiver.read(in);
- }
-
- public static TransferEnvelope createEnvelopeWithEvent(final TransferEnvelope originalEnvelope,
- final ChannelID source, final RemoteReceiver remoteReceiver) {
-
- final TransferEnvelope transferEnvelope = new TransferEnvelope(SENDER_HINT_SEQUENCE_NUMBER,
- originalEnvelope.getJobID(), originalEnvelope.getSource());
-
- final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
- transferEnvelope.addEvent(senderEvent);
-
- return transferEnvelope;
- }
-
- static boolean isSenderHintEvent(final TransferEnvelope transferEnvelope) {
-
- if (transferEnvelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
- return false;
- }
-
- if (transferEnvelope.getBuffer() != null) {
- return false;
- }
-
- final EventList eventList = transferEnvelope.getEventList();
- if (eventList == null) {
- return false;
- }
-
- if (eventList.size() != 1) {
- return false;
- }
-
- if (!(eventList.get(0) instanceof SenderHintEvent)) {
- return false;
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
deleted file mode 100644
index 6c41a4f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-
-public interface TaskContext extends LocalBufferPoolOwner {
-
- OutputGateContext createOutputGateContext(GateID gateID);
-
- InputGateContext createInputGateContext(GateID gateID);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
deleted file mode 100644
index b7d59c6..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-/**
- * This event is sent by an {@link InputChannelContext}. It indicates that the input channel context has received a
- * {@link TransferEnvelope} with a lower sequence number than expected. The typical reason for this is that data is
- * being replayed from a checkpoint. With the help of this event it is possible to request the sender to skip sending
- * transfer envelopes up to the given expected sequence number.
- *
- */
-public final class UnexpectedEnvelopeEvent extends AbstractEvent {
-
- /**
- * The expected sequence number.
- */
- private int expectedSequenceNumber;
-
- /**
- * Constructs a new unexpected envelope event.
- *
- * @param expectedSequenceNumber
- * the expected sequence number
- */
- public UnexpectedEnvelopeEvent(final int expectedSequenceNumber) {
-
- if (expectedSequenceNumber < 0) {
- throw new IllegalArgumentException("Argument expectedSequenceNumber must be non-negative.");
- }
-
- this.expectedSequenceNumber = expectedSequenceNumber;
- }
-
- /**
- * Default constructor for serialization/deserialization.
- */
- public UnexpectedEnvelopeEvent() {
- }
-
- /**
- * Returns the expected sequence number.
- *
- * @return the expected sequence number
- */
- public int getExpectedSequenceNumber() {
-
- return this.expectedSequenceNumber;
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
-
- out.writeInt(this.expectedSequenceNumber);
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
-
- this.expectedSequenceNumber = in.readInt();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
deleted file mode 100644
index ed845e8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ExecutorThreadFactory implements ThreadFactory {
-
- public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
- private static final String THREAD_NAME = "Nephele Executor Thread ";
-
- private final AtomicInteger threadNumber = new AtomicInteger(1);
-
-
- private ExecutorThreadFactory() {}
-
-
- public Thread newThread(Runnable target) {
- Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
- t.setDaemon(true);
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
deleted file mode 100644
index 4a104d7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.UnexpectedEnvelopeEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class ForwardingBarrier extends AbstractOutputChannelForwarder {
-
- private static final Log LOG = LogFactory.getLog(ForwardingBarrier.class);
-
- private final ChannelID outputChannelID;
-
- private int forwardingBarrier = -1;
-
- public ForwardingBarrier(final ChannelID outputChannelID, final AbstractOutputChannelForwarder next) {
- super(next);
-
- if (next == null) {
- throw new IllegalArgumentException("Argument next must not be null");
- }
-
- this.outputChannelID = outputChannelID;
- }
-
-
- @Override
- public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
- if (transferEnvelope.getSequenceNumber() < this.forwardingBarrier) {
- recycleTransferEnvelope(transferEnvelope);
- return;
- }
-
- getNext().push(transferEnvelope);
- }
-
-
- @Override
- public void processEvent(final AbstractEvent event) {
-
- if (event instanceof UnexpectedEnvelopeEvent) {
-
- final UnexpectedEnvelopeEvent uee = (UnexpectedEnvelopeEvent) event;
- if (uee.getExpectedSequenceNumber() > this.forwardingBarrier) {
- this.forwardingBarrier = uee.getExpectedSequenceNumber();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting forwarding barrier to sequence number " + this.forwardingBarrier
- + " for output channel " + this.outputChannelID);
- }
- }
- }
-
- getNext().processEvent(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
deleted file mode 100644
index 5f14743..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-
-public final class RuntimeDispatcher extends AbstractOutputChannelForwarder {
-
- private final TransferEnvelopeDispatcher dispatcher;
-
- public RuntimeDispatcher(final TransferEnvelopeDispatcher dispatcher) {
- super(null);
-
- this.dispatcher = dispatcher;
- }
-
-
- @Override
- public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
- this.dispatcher.processEnvelopeFromOutputChannel(transferEnvelope);
- }
-}