You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:40 UTC
[05/30] Replace custom Java NIO TCP/IP code with Netty 4 library
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
new file mode 100644
index 0000000..ae67f42
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -0,0 +1,251 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class NettyConnectionManager {
+
+ private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
+
+ private final ChannelManager channelManager;
+
+ private final ServerBootstrap in;
+
+ private final Bootstrap out;
+
+ private final ConcurrentMap<RemoteReceiver, Object> outConnections;
+
+ public NettyConnectionManager(ChannelManager channelManager, InetAddress bindAddress, int bindPort,
+ int bufferSize, int numInThreads, int numOutThreads,
+ int lowWaterMark, int highWaterMark) {
+ this.outConnections = new ConcurrentHashMap<RemoteReceiver, Object>();
+ this.channelManager = channelManager;
+
+ // --------------------------------------------------------------------
+
+ int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
+ numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
+ numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
+ LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
+
+ lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
+ highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+ LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
+
+ // --------------------------------------------------------------------
+ // server bootstrap (incoming connections)
+ // --------------------------------------------------------------------
+ this.in = new ServerBootstrap();
+ this.in.group(new NioEventLoopGroup(numInThreads))
+ .channel(NioServerSocketChannel.class)
+ .localAddress(bindAddress, bindPort)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline()
+ .addLast(new InboundEnvelopeDecoder(NettyConnectionManager.this.channelManager))
+ .addLast(new InboundEnvelopeDispatcherHandler(NettyConnectionManager.this.channelManager));
+ }
+ })
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+ // --------------------------------------------------------------------
+ // client bootstrap (outgoing connections)
+ // --------------------------------------------------------------------
+ this.out = new Bootstrap();
+ this.out.group(new NioEventLoopGroup(numOutThreads))
+ .channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline()
+ .addLast(new OutboundEnvelopeEncoder());
+ }
+ })
+ .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark)
+ .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.TCP_NODELAY, false)
+ .option(ChannelOption.SO_KEEPALIVE, true);
+
+ try {
+ this.in.bind().sync();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Could not bind server socket for incoming connections.");
+ }
+ }
+
+ public void shutdown() {
+ Future<?> inShutdownFuture = this.in.group().shutdownGracefully();
+ Future<?> outShutdownFuture = this.out.group().shutdownGracefully();
+
+ try {
+ inShutdownFuture.sync();
+ outShutdownFuture.sync();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Could not properly shutdown connections.");
+ }
+ }
+
+ public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
+ // Get the channel. The channel may be
+ // 1) a channel that already exists (usual case) -> just send the data
+ // 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
+ // 3) not yet existing -> establish the channel
+
+ final Object entry = this.outConnections.get(receiver);
+ final OutboundConnectionQueue channel;
+
+ if (entry != null) {
+ // existing channel or channel in buildup
+ if (entry instanceof OutboundConnectionQueue) {
+ channel = (OutboundConnectionQueue) entry;
+ }
+ else {
+ ChannelInBuildup future = (ChannelInBuildup) entry;
+ channel = future.waitForChannel();
+ }
+ }
+ else {
+ // No channel yet. Create one, but watch out for a race.
+ // We create a "buildup future" and atomically add it to the map.
+ // Only the thread that really added it establishes the channel.
+ // The others need to wait on that original establisher's future.
+ ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
+ Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
+
+ if (old == null) {
+ this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
+ channel = inBuildup.waitForChannel();
+
+ Object previous = this.outConnections.put(receiver, channel);
+
+ if (inBuildup != previous) {
+ throw new IOException("Race condition during channel build up.");
+ }
+ }
+ else if (old instanceof ChannelInBuildup) {
+ channel = ((ChannelInBuildup) old).waitForChannel();
+ }
+ else {
+ channel = (OutboundConnectionQueue) old;
+ }
+ }
+
+ channel.enqueue(envelope);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class ChannelInBuildup implements ChannelFutureListener {
+
+ private Object lock = new Object();
+
+ private volatile OutboundConnectionQueue channel;
+
+ private volatile Throwable error;
+
+ private int numRetries = 2;
+
+ private final Bootstrap out;
+
+ private final RemoteReceiver receiver;
+
+ private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
+ this.out = out;
+ this.receiver = receiver;
+ }
+
+ private void handInChannel(OutboundConnectionQueue c) {
+ synchronized (this.lock) {
+ this.channel = c;
+ this.lock.notifyAll();
+ }
+ }
+
+ private void notifyOfError(Throwable error) {
+ synchronized (this.lock) {
+ this.error = error;
+ this.lock.notifyAll();
+ }
+ }
+
+ private OutboundConnectionQueue waitForChannel() throws IOException {
+ synchronized (this.lock) {
+ while (this.error == null && this.channel == null) {
+ try {
+ this.lock.wait(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Channel buildup interrupted.");
+ }
+ }
+ }
+
+ if (this.error != null) {
+ throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
+ }
+
+ return this.channel;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Channel %s connected", future.channel()));
+ }
+
+ handInChannel(new OutboundConnectionQueue(future.channel()));
+ }
+ else if (this.numRetries > 0) {
+ LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", this.numRetries));
+
+ this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
+ this.numRetries--;
+ }
+ else {
+ if (future.getClass() != null) {
+ notifyOfError(future.cause());
+ }
+ else {
+ notifyOfError(new Exception("Connection could not be established."));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
new file mode 100644
index 0000000..c687408
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -0,0 +1,94 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayDeque;
+
+public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
+
+ private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
+
+ private final Channel channel;
+
+ private final ArrayDeque<Envelope> queuedEnvelopes;
+
+ public OutboundConnectionQueue(Channel channel) {
+ this.channel = channel;
+ this.queuedEnvelopes = new ArrayDeque<Envelope>();
+
+ channel.pipeline().addFirst(this);
+ }
+
+ /**
+ * Enqueues an envelope so be sent later.
+ * <p/>
+ * This method is always invoked by the task thread that wants the envelope sent.
+ *
+ * @param env The envelope to be sent.
+ */
+ public void enqueue(Envelope env) {
+ // the user event trigger ensure thread-safe hand-over of the envelope
+ this.channel.pipeline().fireUserEventTriggered(env);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
+ boolean triggerWrite = this.queuedEnvelopes.isEmpty();
+
+ this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
+
+ if (triggerWrite) {
+ writeAndFlushNextEnvelopeIfPossible();
+ }
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ writeAndFlushNextEnvelopeIfPossible();
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ writeAndFlushNextEnvelopeIfPossible();
+ }
+ else if (future.cause() != null) {
+ exceptionOccurred(future.cause());
+ }
+ else {
+ exceptionOccurred(new Exception("Envelope send aborted."));
+ }
+ }
+
+ private void writeAndFlushNextEnvelopeIfPossible() {
+ if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
+ Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
+ this.channel.writeAndFlush(nextEnvelope).addListener(this);
+ }
+ }
+
+ private void exceptionOccurred(Throwable t) throws Exception {
+ LOG.error(String.format("An exception occurred in Channel %s: %s", this.channel, t.getMessage()));
+ throw new Exception(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
new file mode 100644
index 0000000..424f2c0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
@@ -0,0 +1,65 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.netty;
+
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.Envelope;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+@ChannelHandler.Sharable
+public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
+
+ public static final int HEADER_SIZE = 48;
+
+ public static final int MAGIC_NUMBER = 0xBADC0FFE;
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) throws Exception {
+ // --------------------------------------------------------------------
+ // (1) header (48 bytes)
+ // --------------------------------------------------------------------
+ out.writeInt(MAGIC_NUMBER); // 4 bytes
+
+ if (out.getInt(out.writerIndex()-4) != MAGIC_NUMBER) {
+ throw new RuntimeException();
+ }
+
+ out.writeInt(env.getSequenceNumber()); // 4 bytes
+ env.getJobID().writeTo(out); // 16 bytes
+ env.getSource().writeTo(out); // 16 bytes
+ out.writeInt(env.getEventsSerialized() != null ? env.getEventsSerialized().remaining() : 0); // 4 bytes
+ out.writeInt(env.getBuffer() != null ? env.getBuffer().size() : 0); // 4 bytes
+ // --------------------------------------------------------------------
+ // (2) events (var length)
+ // --------------------------------------------------------------------
+ if (env.getEventsSerialized() != null) {
+ out.writeBytes(env.getEventsSerialized());
+ }
+
+ // --------------------------------------------------------------------
+ // (3) buffer (var length)
+ // --------------------------------------------------------------------
+ if (env.getBuffer() != null) {
+ Buffer buffer = env.getBuffer();
+ out.writeBytes(buffer.getMemorySegment().wrap(0, buffer.size()));
+
+ // Recycle the buffer from OUR buffer pool after everything has been
+ // copied to Nettys buffer space.
+ buffer.recycleBuffer();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
deleted file mode 100644
index f22e6f7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.ChannelManager;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader.DeserializationState;
-import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-
-/**
- * An incoming TCP connection through which data is read and transformed into {@link Envelope} objects.
- */
-public class IncomingConnection {
-
- private static final Log LOG = LogFactory.getLog(IncomingConnection.class);
-
- /** Readable byte channel (TCP socket) to read data from */
- private final ReadableByteChannel channel;
-
- /** Channel manager to dispatch complete envelopes */
- private final ChannelManager channelManager;
-
- /** Envelope reader to turn the channel data into envelopes */
- private final EnvelopeReader reader;
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public IncomingConnection(ReadableByteChannel channel, ChannelManager channelManager) {
- this.channel = channel;
- this.channelManager = channelManager;
- this.reader = new EnvelopeReader(channelManager);
- }
-
- // -----------------------------------------------------------------------------------------------------------------
-
- public void read() throws IOException, InterruptedException, NoBufferAvailableException {
- DeserializationState deserializationState = this.reader.readNextChunk(this.channel);
-
- switch (deserializationState) {
- case COMPLETE:
- Envelope envelope = this.reader.getFullyDeserializedTransferEnvelope();
- this.channelManager.dispatchFromNetwork(envelope);
- this.reader.reset();
- break;
-
- case NO_BUFFER_AVAILABLE:
- throw new NoBufferAvailableException(this.reader.getBufferProvider());
-
- case PENDING:
- break;
- }
- }
-
- public void reportTransmissionProblem(SelectionKey key, IOException ioe) {
- LOG.error(StringUtils.stringifyException(ioe));
-
- try {
- this.channel.close();
- } catch (IOException e) {
- LOG.debug("An error occurred while closing the byte channel");
- }
-
- if (key != null) {
- key.cancel();
- }
-
- Envelope pendingEnvelope = this.reader.getPendingEnvelope();
- if (pendingEnvelope != null) {
- if (pendingEnvelope.hasBuffer()) {
- Buffer buffer = pendingEnvelope.getBuffer();
- if (buffer != null) {
- buffer.recycleBuffer();
- }
- }
- }
-
- this.reader.reset();
- }
-
- public boolean isCloseUnexpected() {
- return this.reader.hasUnfinishedData();
- }
-
- public void closeConnection(SelectionKey key) {
- try {
- this.channel.close();
- } catch (IOException ioe) {
- LOG.error("An IOException occurred while closing the socket: + " + StringUtils.stringifyException(ioe));
- }
-
- if (key != null) {
- key.cancel();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
deleted file mode 100644
index 774ad4e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.network.ChannelManager;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-public class IncomingConnectionThread extends Thread {
-
- private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
-
- private final ChannelManager channelManager;
-
- private final Selector selector;
-
- private final Queue<SelectionKey> pendingReadEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
- private final ServerSocketChannel listeningSocket;
-
- private static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
-
- private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
-
- private final SelectionKey key;
-
- private IncomingConnectionBufferAvailListener(final Queue<SelectionKey> pendingReadEventSubscribeRequests,
- final SelectionKey key) {
-
- this.pendingReadEventSubscribeRequests = pendingReadEventSubscribeRequests;
- this.key = key;
- }
-
- @Override
- public void bufferAvailable() {
-
- synchronized (this.pendingReadEventSubscribeRequests) {
- this.pendingReadEventSubscribeRequests.add(this.key);
- }
- }
- }
-
- public IncomingConnectionThread(ChannelManager channelManager,
- boolean isListeningThread, InetSocketAddress listeningAddress) throws IOException {
- super("Incoming Connection Thread");
-
- this.selector = Selector.open();
- this.channelManager = channelManager;
-
- if (isListeningThread) {
- this.listeningSocket = ServerSocketChannel.open();
- this.listeningSocket.configureBlocking(false);
- listeningSocket.register(this.selector, SelectionKey.OP_ACCEPT);
- this.listeningSocket.socket().bind(listeningAddress);
- LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
- } else {
- this.listeningSocket = null;
- }
- }
-
- @Override
- public void run() {
- try {
- while (!this.isInterrupted()) {
-
- synchronized (this.pendingReadEventSubscribeRequests) {
- while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
- final SelectionKey key = this.pendingReadEventSubscribeRequests.poll();
- final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
- final SocketChannel socketChannel = (SocketChannel) key.channel();
-
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e) {
- incomingConnection.reportTransmissionProblem(key, e);
- }
- }
- }
-
- try {
- this.selector.select(500);
- } catch (IOException e) {
- LOG.error(e);
- }
-
- final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
- while (iter.hasNext()) {
- final SelectionKey key = iter.next();
-
- iter.remove();
- if (key.isValid()) {
- if (key.isReadable()) {
- doRead(key);
- } else if (key.isAcceptable()) {
- doAccept(key);
- } else {
- LOG.error("Unknown key: " + key);
- }
- } else {
- LOG.error("Received invalid key: " + key);
- }
- }
- }
-
- // Do cleanup, if necessary
- if (this.listeningSocket != null) {
- try {
- this.listeningSocket.close();
- } catch (IOException ioe) {
- // Actually, we can ignore this exception
- LOG.debug(ioe);
- }
- }
-
- // Finally, close the selector
- try {
- this.selector.close();
- } catch (IOException ioe) {
- LOG.debug(StringUtils.stringifyException(ioe));
- }
- }
- catch (Throwable t) {
- // this is a disaster, this task manager cannot go on!
- LOG.fatal("Incoming connection thread died with an exception: " + t.getMessage(), t);
- System.exit(1);
- }
- }
-
- private void doAccept(SelectionKey key) {
-
- SocketChannel clientSocket = null;
-
- try {
- clientSocket = this.listeningSocket.accept();
- if (clientSocket == null) {
- LOG.error("Client socket is null");
- return;
- }
- } catch (IOException ioe) {
- LOG.error(ioe);
- return;
- }
-
- final IncomingConnection incomingConnection = new IncomingConnection(
- clientSocket, this.channelManager);
- SelectionKey clientKey = null;
- try {
- clientSocket.configureBlocking(false);
- clientKey = clientSocket.register(this.selector, SelectionKey.OP_READ);
- clientKey.attach(incomingConnection);
- } catch (IOException ioe) {
- incomingConnection.reportTransmissionProblem(clientKey, ioe);
- }
- }
-
- private void doRead(SelectionKey key) {
-
- final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
- try {
- incomingConnection.read();
- } catch (EOFException eof) {
- if (incomingConnection.isCloseUnexpected()) {
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- LOG.error("Connection from " + socketChannel.socket().getRemoteSocketAddress()
- + " was closed unexpectedly");
- incomingConnection.reportTransmissionProblem(key, eof);
- } else {
- incomingConnection.closeConnection(key);
- }
- } catch (IOException ioe) {
- incomingConnection.reportTransmissionProblem(key, ioe);
- } catch (InterruptedException e) {
- // Nothing to do here
- } catch (NoBufferAvailableException e) {
- // There are no buffers available, unsubscribe from read event
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, 0);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e1) {
- incomingConnection.reportTransmissionProblem(key, e1);
- }
-
- final BufferAvailabilityListener bal = new IncomingConnectionBufferAvailListener(
- this.pendingReadEventSubscribeRequests, key);
- if (!e.getBufferProvider().registerBufferAvailabilityListener(bal)) {
- // In the meantime, a buffer has become available again, subscribe to read event again
-
- try {
- final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newKey.attach(incomingConnection);
- } catch (ClosedChannelException e1) {
- incomingConnection.reportTransmissionProblem(key, e1);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
deleted file mode 100644
index 7df1901..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.network.RemoteReceiver;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeWriter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-/**
- * This class represents an outgoing TCP connection through which {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects can be sent.
- * {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are received from the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} and added to a queue. An
- * additional network thread then takes the envelopes from the queue and transmits them to the respective destination
- * host.
- *
- */
-public class OutgoingConnection {
-
- /**
- * The log object used to report debug information and possible errors.
- */
- private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
-
- /**
- * The address this outgoing connection is connected to.
- */
- private final RemoteReceiver remoteReceiver;
-
- /**
- * The outgoing connection thread which actually transmits the queued transfer envelopes.
- */
- private final OutgoingConnectionThread connectionThread;
-
- /**
- * The queue of transfer envelopes to be transmitted.
- */
- private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
-
- /**
- * The {@link eu.stratosphere.runtime.io.network.envelope.Envelope} that is currently processed.
- */
- private Envelope currentEnvelope = null;
-
- /**
- * Stores whether the underlying TCP connection is established. As this variable is accessed by the byte buffered
- * channel manager and the outgoing connection thread, it must be protected by a monitor.
- */
- private boolean isConnected = false;
-
- /**
- * Stores whether is underlying TCP connection is subscribed to the NIO write event. As this variable is accessed by
- * the byte buffered channel and the outgoing connection thread, it must be protected by a monitor.
- */
- private boolean isSubscribedToWriteEvent = false;
-
- /**
- * The overall number of connection retries which shall be performed before a connection error is reported.
- */
- private final int numberOfConnectionRetries;
-
- /**
- * The number of connection retries left before an I/O error is reported.
- */
- private int retriesLeft = 0;
-
- /**
- * The timestamp of the last connection retry.
- */
- private long timstampOfLastRetry = 0;
-
- /**
- * The current selection key representing the interest set of the underlying TCP NIO connection. This variable may
- * only be accessed the the outgoing connection thread.
- */
- private SelectionKey selectionKey = null;
-
- /**
- * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed.
- */
- private static long RETRYINTERVAL = 1000L; // 1 second
-
- private EnvelopeWriter writer;
-
- /**
- * Constructs a new outgoing connection object.
- *
- * @param remoteReceiver
- * the address of the destination host this outgoing connection object is supposed to connect to
- * @param connectionThread
- * the connection thread which actually handles the network transfer
- * @param numberOfConnectionRetries
- * the number of connection retries allowed before an I/O error is reported
- */
- public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread connectionThread,
- int numberOfConnectionRetries) {
-
- this.remoteReceiver = remoteReceiver;
- this.connectionThread = connectionThread;
- this.numberOfConnectionRetries = numberOfConnectionRetries;
- this.writer = new EnvelopeWriter();
- }
-
- /**
- * Adds a new {@link eu.stratosphere.runtime.io.network.envelope.Envelope} to the queue of envelopes to be transmitted to the destination host of this
- * connection.
- * <p>
- * This method should only be called by the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
- *
- * @param envelope
- * the envelope to be added to the transfer queue
- */
- public void queueEnvelope(Envelope envelope) {
-
- synchronized (this.queuedEnvelopes) {
-
- checkConnection();
- this.queuedEnvelopes.add(envelope);
- }
- }
-
- private void checkConnection() {
-
- synchronized (this.queuedEnvelopes) {
-
- if (!this.isConnected) {
-
- this.retriesLeft = this.numberOfConnectionRetries;
- this.timstampOfLastRetry = System.currentTimeMillis();
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- } else {
-
- if (!this.isSubscribedToWriteEvent) {
- this.connectionThread.subscribeToWriteEvent(this.selectionKey);
- this.isSubscribedToWriteEvent = true;
- }
- }
-
- }
- }
-
- /**
- * Returns the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
- * connected to.
- * <p>
- * This method should be called by the {@link OutgoingConnectionThread} object only.
- *
- * @return the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
- * connected to
- */
- public InetSocketAddress getConnectionAddress() {
-
- return this.remoteReceiver.getConnectionAddress();
- }
-
- /**
- * Reports a problem which occurred while establishing the underlying TCP connection to this outgoing connection
- * object. Depending on the number of connection retries left, this method will either try to reestablish the TCP
- * connection or report an I/O error to all tasks which have queued envelopes for this connection. In the latter
- * case all queued envelopes will be dropped and all included buffers will be freed.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @param ioe
- * thrown if an error occurs while reseting the underlying TCP connection
- */
- public void reportConnectionProblem(IOException ioe) {
-
- // First, write exception to log
- final long currentTime = System.currentTimeMillis();
- if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
- LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
- }
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.selectionKey != null) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
- if (socketChannel != null) {
- try {
- socketChannel.close();
- } catch (IOException e) {
- LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
- }
- }
-
- this.selectionKey.cancel();
- this.selectionKey = null;
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- }
-
- if (hasRetriesLeft(currentTime)) {
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- return;
- }
-
- // Error is fatal
- LOG.error(ioe);
-
- // Notify source of current envelope and release buffer
- if (this.currentEnvelope != null) {
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- this.currentEnvelope = null;
- }
- }
-
- // Notify all other tasks which are waiting for data to be transmitted
- final Iterator<Envelope> iter = this.queuedEnvelopes.iterator();
- while (iter.hasNext()) {
- final Envelope envelope = iter.next();
- iter.remove();
- // Recycle the buffer inside the envelope
- if (envelope.getBuffer() != null) {
- envelope.getBuffer().recycleBuffer();
- }
- }
-
- this.queuedEnvelopes.clear();
- }
- }
-
- /**
- * Reports an I/O error which occurred while writing data to the TCP connection. As a result of the I/O error the
- * connection is closed and the interest keys are canceled. Moreover, the task which queued the currently
- * transmitted transfer envelope is notified about the error and the current envelope is dropped. If the current
- * envelope contains a buffer, the buffer is freed.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @param ioe
- * thrown if an error occurs while reseting the connection
- */
- public void reportTransmissionProblem(IOException ioe) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-
- // First, write exception to log
- if (this.currentEnvelope != null) {
- LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
- + socketChannel.socket().getRemoteSocketAddress()
- + " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
- } else {
- LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
- + socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
- }
-
- // Close the connection and cancel the interest key
- synchronized (this.queuedEnvelopes) {
- try {
- LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
- socketChannel.close();
- } catch (IOException e) {
- LOG.debug("An error occurred while responding to an IOException");
- LOG.debug(e);
- }
-
- this.selectionKey.cancel();
-
- // Error is fatal
- LOG.error(ioe);
-
- // Trigger new connection if there are more envelopes to be transmitted
- if (this.queuedEnvelopes.isEmpty()) {
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- } else {
- this.connectionThread.triggerConnect(this);
- this.isConnected = true;
- this.isSubscribedToWriteEvent = true;
- }
-
- // We must assume the current envelope is corrupted so we notify the task which created it.
- if (this.currentEnvelope != null) {
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- this.currentEnvelope = null;
- }
- }
- }
- }
-
- /**
- * Checks whether further retries are left for establishing the underlying TCP connection.
- *
- * @param currentTime
- * the current system time in milliseconds since January 1st, 1970
- * @return <code>true</code> if there are retries left, <code>false</code> otherwise
- */
- private boolean hasRetriesLeft(long currentTime) {
-
- if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
- this.retriesLeft--;
- this.timstampOfLastRetry = currentTime;
- if (this.retriesLeft == 0) {
- return false;
- }
- }
-
- return true;
- }
-
- /**
- * Writes the content of the current {@link eu.stratosphere.runtime.io.network.envelope.Envelope} object to the underlying TCP connection.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @return <code>true</code> if there is more data from this/other queued envelopes to be written to this channel
- * @throws IOException
- * thrown if an error occurs while writing the data to the channel
- */
- public boolean write() throws IOException {
-
- final WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
-
- if (this.currentEnvelope == null) {
- synchronized (this.queuedEnvelopes) {
- if (this.queuedEnvelopes.isEmpty()) {
- return false;
- } else {
- this.currentEnvelope = this.queuedEnvelopes.peek();
-
- this.writer.setEnvelopeForWriting(this.currentEnvelope);
- }
- }
- }
-
- if (!this.writer.writeNextChunk(writableByteChannel)) {
- // Make sure we recycle the attached memory or file buffers correctly
- if (this.currentEnvelope.getBuffer() != null) {
- this.currentEnvelope.getBuffer().recycleBuffer();
- }
-
- synchronized (this.queuedEnvelopes) {
- this.queuedEnvelopes.poll();
- this.currentEnvelope = null;
- }
- }
-
- return true;
- }
-
- /**
- * Requests to close the underlying TCP connection. The request is ignored if at least one {@link eu.stratosphere.runtime.io.network.envelope.Envelope}
- * is queued.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @throws IOException
- * thrown if an error occurs while closing the TCP connection
- */
- public void requestClose() throws IOException {
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.queuedEnvelopes.isEmpty()) {
-
- if (this.isSubscribedToWriteEvent) {
-
- this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
- this.isSubscribedToWriteEvent = false;
- }
- }
- }
- }
-
- /**
- * Closes the underlying TCP connection if no more {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are in the transmission queue.
- * <p>
- * This method should only be called by the {@link OutgoingConnectionThread} object.
- *
- * @throws IOException
- */
- public void closeConnection() throws IOException {
-
- synchronized (this.queuedEnvelopes) {
-
- if (!this.queuedEnvelopes.isEmpty()) {
- return;
- }
-
- if (this.selectionKey != null) {
-
- final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
- socketChannel.close();
- this.selectionKey.cancel();
- this.selectionKey = null;
- }
-
- this.isConnected = false;
- this.isSubscribedToWriteEvent = false;
- }
- }
-
- /**
- * Returns the number of queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects with the given source channel ID.
- *
- * @param sourceChannelID
- * the source channel ID to count the queued envelopes for
- * @return the number of queued transfer envelopes with the given source channel ID
- */
- public int getNumberOfQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
- synchronized (this.queuedEnvelopes) {
-
- int number = 0;
-
- final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
- final Envelope te = it.next();
- if (sourceChannelID.equals(te.getSource())) {
- number++;
- }
- }
-
- return number;
- }
- }
-
- /**
- * Removes all queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects from the transmission which match the given source channel
- * ID.
- *
- * @param sourceChannelID
- * the source channel ID of the transfered transfer envelopes to be dropped
- */
- public void dropAllQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
- synchronized (this.queuedEnvelopes) {
-
- final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
- final Envelope te = it.next();
- if (sourceChannelID.equals(te.getSource())) {
- it.remove();
- if (te.getBuffer() != null) {
- te.getBuffer().recycleBuffer();
- }
- }
- }
- }
- }
-
- /**
- * Checks whether this outgoing connection object manages an active connection or can be removed by the
- * {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
- * <p>
- * This method should only be called by the byte buffered channel manager.
- *
- * @return <code>true</code> if this object is no longer manages an active connection and can be removed,
- * <code>false</code> otherwise.
- */
- public boolean canBeRemoved() {
-
- synchronized (this.queuedEnvelopes) {
-
- if (this.isConnected) {
- return false;
- }
-
- if (this.currentEnvelope != null) {
- return false;
- }
-
- return this.queuedEnvelopes.isEmpty();
- }
- }
-
- /**
- * Sets the selection key representing the interest set of the underlying TCP NIO connection.
- *
- * @param selectionKey
- * the selection of the underlying TCP connection
- */
- public void setSelectionKey(SelectionKey selectionKey) {
- this.selectionKey = selectionKey;
- }
-
- /**
- * Returns the number of currently queued envelopes which contain a write buffer.
- *
- * @return the number of currently queued envelopes which contain a write buffer
- */
- public int getNumberOfQueuedWriteBuffers() {
-
- int retVal = 0;
-
- synchronized (this.queuedEnvelopes) {
-
- final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
- while (it.hasNext()) {
-
- final Envelope envelope = it.next();
- if (envelope.getBuffer() != null) {
- ++retVal;
- }
- }
- }
-
- return retVal;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
deleted file mode 100644
index dde26e3..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.tcp;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.util.StringUtils;
-
-public class OutgoingConnectionThread extends Thread {
-
- /**
- * The minimum time a TCP connection must be idle it is closed.
- */
- private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000L; // 80 seconds
-
- private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
-
- private final Selector selector;
-
- private final Queue<OutgoingConnection> pendingConnectionRequests = new ArrayDeque<OutgoingConnection>();
-
- private final Queue<SelectionKey> pendingWriteEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
- private final Map<OutgoingConnection, Long> connectionsToClose = new HashMap<OutgoingConnection, Long>();
-
- public OutgoingConnectionThread() throws IOException {
- super("Outgoing Connection Thread");
-
- this.selector = Selector.open();
- }
-
-
- @Override
- public void run() {
- try {
- while (!isInterrupted()) {
-
- synchronized (this.pendingConnectionRequests) {
-
- if (!this.pendingConnectionRequests.isEmpty()) {
-
- final OutgoingConnection outgoingConnection = this.pendingConnectionRequests.poll();
- try {
- final SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
- socketChannel.connect(outgoingConnection.getConnectionAddress());
- key.attach(outgoingConnection);
- } catch (final IOException ioe) {
- // IOException is reported by separate thread to avoid deadlocks
- final Runnable reporterThread = new Runnable() {
-
- @Override
- public void run() {
- outgoingConnection.reportConnectionProblem(ioe);
- }
- };
- new Thread(reporterThread).start();
- }
- }
- }
-
- synchronized (this.pendingWriteEventSubscribeRequests) {
-
- if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
- final SelectionKey oldSelectionKey = this.pendingWriteEventSubscribeRequests.poll();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) oldSelectionKey.attachment();
- final SocketChannel socketChannel = (SocketChannel) oldSelectionKey.channel();
-
- try {
- final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ
- | SelectionKey.OP_WRITE);
- newSelectionKey.attach(outgoingConnection);
- outgoingConnection.setSelectionKey(newSelectionKey);
- } catch (final IOException ioe) {
- // IOException is reported by separate thread to avoid deadlocks
- final Runnable reporterThread = new Runnable() {
-
- @Override
- public void run() {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- };
- new Thread(reporterThread).start();
- }
- }
- }
-
- synchronized (this.connectionsToClose) {
-
- final Iterator<Map.Entry<OutgoingConnection, Long>> closeIt = this.connectionsToClose.entrySet()
- .iterator();
- final long now = System.currentTimeMillis();
- while (closeIt.hasNext()) {
-
- final Map.Entry<OutgoingConnection, Long> entry = closeIt.next();
- if ((entry.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE) < now) {
- final OutgoingConnection outgoingConnection = entry.getKey();
- closeIt.remove();
- // Create new thread to close connection to avoid deadlocks
- final Runnable closeThread = new Runnable() {
-
- @Override
- public void run() {
- try {
- outgoingConnection.closeConnection();
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
- };
-
- new Thread(closeThread).start();
- }
-
- }
- }
-
- try {
- this.selector.select(10);
- } catch (IOException e) {
- LOG.error(e);
- }
-
- final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
- while (iter.hasNext()) {
- final SelectionKey key = iter.next();
-
- iter.remove();
- if (key.isValid()) {
- if (key.isConnectable()) {
- doConnect(key);
- } else {
- if (key.isReadable()) {
- doRead(key);
- // A read will always result in an exception, so the write key will not be valid anymore
- continue;
- }
- if (key.isWritable()) {
- doWrite(key);
- }
- }
- } else {
- LOG.error("Received invalid key: " + key);
- }
- }
- }
-
- // Finally, try to close the selector
- try {
- this.selector.close();
- } catch (IOException ioe) {
- LOG.debug(StringUtils.stringifyException(ioe));
- }
- }
- catch (Throwable t) {
- // this is a disaster, this task manager cannot go on!
- LOG.fatal("Outgoing connection thread died with an exception: " + t.getMessage(), t);
- System.exit(1);
- }
- }
-
- private void doConnect(SelectionKey key) {
-
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- try {
- while (!socketChannel.finishConnect()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e1) {
- LOG.error(e1);
- }
- }
-
- final SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_WRITE
- | SelectionKey.OP_READ);
- outgoingConnection.setSelectionKey(channelKey);
- channelKey.attach(outgoingConnection);
-
- } catch (IOException ioe) {
- outgoingConnection.reportConnectionProblem(ioe);
- }
- }
-
- private void doWrite(SelectionKey key) {
-
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-
- try {
-
- if (!outgoingConnection.write()) {
- // Try to close the connection
- outgoingConnection.requestClose();
- }
-
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
-
- private void doRead(SelectionKey key) {
-
- final SocketChannel socketChannel = (SocketChannel) key.channel();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
- final ByteBuffer buf = ByteBuffer.allocate(8);
-
- try {
-
- if (socketChannel.read(buf) == -1) {
- outgoingConnection.reportTransmissionProblem(new IOException(
- "Read unexpected EOF from channel"));
- } else {
- LOG.error("Outgoing connection read real data from channel");
- }
- } catch (IOException ioe) {
- outgoingConnection.reportTransmissionProblem(ioe);
- }
- }
-
- public void triggerConnect(OutgoingConnection outgoingConnection) {
-
- synchronized (this.pendingConnectionRequests) {
- this.pendingConnectionRequests.add(outgoingConnection);
- }
- }
-
- public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
-
- final SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
- final OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
-
- final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
- newSelectionKey.attach(outgoingConnection);
- outgoingConnection.setSelectionKey(newSelectionKey);
-
- synchronized (this.connectionsToClose) {
- this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
- }
- }
-
- public void subscribeToWriteEvent(SelectionKey selectionKey) {
-
- synchronized (this.pendingWriteEventSubscribeRequests) {
- this.pendingWriteEventSubscribeRequests.add(selectionKey);
- }
- synchronized (this.connectionsToClose) {
- this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
index f4c8aec..696915f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -15,9 +15,6 @@ package eu.stratosphere.runtime.io.serialization;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
-import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
-import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
import java.io.DataInput;
import java.io.EOFException;
@@ -62,6 +59,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
// check if we can get a full length;
if (nonSpanningRemaining >= 4) {
int len = this.nonSpanningWrapper.readInt();
+
if (len <= nonSpanningRemaining - 4) {
// we can get a full record from here
target.read(this.nonSpanningWrapper);
@@ -156,8 +154,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
@Override
public final void readFully(byte[] b, int off, int len) throws IOException {
- if (off < 0 || len < 0 || off + len > b.length)
+ if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
+ }
this.segment.get(this.position, b, off, len);
this.position += len;
@@ -230,14 +229,16 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
try {
int b;
while ((b = readUnsignedByte()) != '\n') {
- if (b != '\r')
+ if (b != '\r') {
bld.append((char) b);
+ }
}
}
catch (EOFException eofex) {}
- if (bld.length() == 0)
+ if (bld.length() == 0) {
return null;
+ }
// trim a trailing carriage return
int len = bld.length();
@@ -275,8 +276,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
- if (c > 127)
+ if (c > 127) {
break;
+ }
count++;
chararr[chararr_count++] = (char) c;
}
@@ -298,21 +300,25 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
case 12:
case 13:
count += 2;
- if (count > utflen)
+ if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
+ }
char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80)
+ if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
+ }
chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
count += 3;
- if (count > utflen)
+ if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
+ }
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
break;
default:
@@ -325,8 +331,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
@Override
public final int skipBytes(int n) throws IOException {
- if (n < 0)
+ if (n < 0) {
throw new IllegalArgumentException();
+ }
int toSkip = Math.min(n, remaining());
this.position += toSkip;
@@ -390,6 +397,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
return;
} else {
this.recordLength = this.lengthBuffer.getInt(0);
+
this.lengthBuffer.clear();
segmentPosition = toPut;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
index e6479fe..a8a53fe 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
@@ -63,11 +63,13 @@ public class DataInputDeserializer implements DataInput {
}
public void setBuffer(byte[] buffer, int start, int len) {
- if (buffer == null)
+ if (buffer == null) {
throw new NullPointerException();
+ }
- if (start < 0 || len < 0 || start + len >= buffer.length)
+ if (start < 0 || len < 0 || start + len >= buffer.length) {
throw new IllegalArgumentException();
+ }
this.buffer = buffer;
this.position = start;
@@ -144,7 +146,7 @@ public class DataInputDeserializer implements DataInput {
@SuppressWarnings("restriction")
int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
if (LITTLE_ENDIAN) {
- value = Integer.reverseBytes(value);
+ value = Integer.reverseBytes(value);
}
this.position += 4;
@@ -183,7 +185,7 @@ public class DataInputDeserializer implements DataInput {
@SuppressWarnings("restriction")
long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
if (LITTLE_ENDIAN) {
- value = Long.reverseBytes(value);
+ value = Long.reverseBytes(value);
}
this.position += 8;
return value;
@@ -215,8 +217,9 @@ public class DataInputDeserializer implements DataInput {
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
- if (c > 127)
+ if (c > 127) {
break;
+ }
count++;
chararr[chararr_count++] = (char) c;
}
@@ -240,22 +243,26 @@ public class DataInputDeserializer implements DataInput {
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
- if (count > utflen)
+ if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
+ }
char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80)
+ if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
+ }
chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
- if (count > utflen)
+ if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
+ }
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
index b5171b9..ce088f0 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
@@ -196,9 +196,9 @@ public class DataOutputSerializer implements DataOutput {
}
}
- if (utflen > 65535)
+ if (utflen > 65535) {
throw new UTFDataFormatException("Encoded string is too long: " + utflen);
-
+ }
else if (this.position > this.buffer.length - utflen - 2) {
resize(utflen + 2);
}
@@ -212,8 +212,9 @@ public class DataOutputSerializer implements DataOutput {
int i = 0;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
- if (!((c >= 0x0001) && (c <= 0x007F)))
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
break;
+ }
bytearr[count++] = (byte) c;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
index 443f8d8..b54496a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
@@ -68,6 +68,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
// write data and length
record.write(this.serializationBuffer);
+
this.lengthBuffer.putInt(0, this.serializationBuffer.length());
this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
@@ -103,8 +104,9 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
* @param source the {@link ByteBuffer} to copy data from
*/
private void copyToTargetBufferFrom(ByteBuffer source) {
- if (this.targetBuffer == null)
+ if (this.targetBuffer == null) {
return;
+ }
int needed = source.remaining();
int available = this.limit - this.position;
@@ -127,8 +129,9 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public Buffer getCurrentBuffer() {
- if (targetBuffer == null)
+ if (targetBuffer == null) {
return null;
+ }
this.targetBuffer.limitSize(this.position);
return this.targetBuffer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
index 09b244f..3b2ad69 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/TestBufferProvider.java
@@ -70,7 +70,7 @@ public class TestBufferProvider implements BufferProvider {
}
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener bufferAvailabilityListener) {
throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
index c66d821..13fbfbc 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java
@@ -33,7 +33,6 @@ import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.pact.runtime.shipping.OutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
index 0b968d8..2bd5d98 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/RecordOutputEmitterTest.java
@@ -28,8 +28,6 @@ import org.junit.Test;
import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.distributions.UniformIntegerDistribution;
import eu.stratosphere.runtime.io.api.ChannelSelector;
-import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
-import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.pact.runtime.shipping.RecordOutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
index a397312..b715a4e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/MockEnvironment.java
@@ -44,7 +44,9 @@ import eu.stratosphere.util.MutableObjectIterator;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.FutureTask;
public class MockEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner {
@@ -124,8 +126,8 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
}
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
- return false;
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ return BufferAvailabilityRegistration.NOT_REGISTERED_BUFFER_POOL_DESTROYED;
}
@Override
@@ -338,4 +340,9 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
public BufferProvider getOutputBufferProvider() {
return this;
}
+
+ @Override
+ public Map<String, FutureTask<Path>> getCopyTask() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
deleted file mode 100644
index b8914a8..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReaderWriterTest.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.network.envelope;
-
-import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.util.DiscardingRecycler;
-import eu.stratosphere.nephele.util.TestBufferProvider;
-import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
-import eu.stratosphere.runtime.io.BufferRecycler;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-public class EnvelopeReaderWriterTest {
-
- private final long RANDOM_SEED = 520346508276087l;
-
- private static final int BUFFER_SIZE = 32768;
-
- private static final byte BUFFER_CONTENT = 13;
-
- private final int[] BUFFER_SIZES = { 0, 2, BUFFER_SIZE, 3782, 88, 0, 23};
-
- private final AbstractEvent[][] EVENT_LISTS = {
- {},
- {},
- {},
- { new TestEvent1(34872527) },
- { new TestEvent1(8749653), new TestEvent1(365345) },
- { new TestEvent2(34563456), new TestEvent1(598432), new TestEvent2(976293845) },
- {}
- };
-
- @Test
- public void testWriteAndRead() {
-
- Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
-
- File testFile = null;
- RandomAccessFile raf = null;
- try {
- testFile = File.createTempFile("envelopes", ".tmp");
- raf = new RandomAccessFile(testFile, "rw");
-
- // write
- FileChannel c = raf.getChannel();
- writeEnvelopes(c);
-
- // read
- c.position(0);
- readEnvelopes(c, -1.0f);
- c.close();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- finally {
- if (raf != null)
- try { raf.close(); } catch (Throwable t) {}
-
- if (testFile != null)
- testFile.delete();
- }
- }
-
- @Test
- public void testWriteAndReadChunked() {
-
- Assert.assertTrue("Test broken.", BUFFER_SIZES.length == EVENT_LISTS.length);
-
- File testFile = null;
- RandomAccessFile raf = null;
- try {
- testFile = File.createTempFile("envelopes", ".tmp");
- raf = new RandomAccessFile(testFile, "rw");
-
- // write
- FileChannel c = raf.getChannel();
- writeEnvelopes(new ChunkedWriteableChannel(c));
-
- // read
- c.position(0);
- readEnvelopes(new ChunkedReadableChannel(c), 0.75f);
- c.close();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- finally {
- if (raf != null)
- try { raf.close(); } catch (Throwable t) {}
-
- if (testFile != null)
- testFile.delete();
- }
- }
-
- private void writeEnvelopes(WritableByteChannel channel) throws IOException {
-
- final BufferRecycler recycler = new DiscardingRecycler();
- final Random rand = new Random(RANDOM_SEED);
-
- final EnvelopeWriter serializer = new EnvelopeWriter();
-
- final int NUM_ENVS = BUFFER_SIZES.length;
-
- for (int i = 0; i < NUM_ENVS; i++) {
- int seqNum = Math.abs(rand.nextInt());
- JobID jid = new JobID(rand.nextLong(), rand.nextLong());
- ChannelID sid = new ChannelID(rand.nextLong(), rand.nextLong());
-
- Envelope env = new Envelope(seqNum, jid, sid);
- if (EVENT_LISTS[i].length > 0) {
- env.serializeEventList(Arrays.asList(EVENT_LISTS[i]));
- }
-
- int bufferSize = BUFFER_SIZES[i];
- if (bufferSize > 0) {
- MemorySegment ms = new MemorySegment(new byte[BUFFER_SIZE]);
- for (int x = 0; x < bufferSize; x++) {
- ms.put(x, BUFFER_CONTENT);
- }
-
- Buffer mb = new Buffer(ms, bufferSize, recycler);
- env.setBuffer(mb);
- }
-
- serializer.setEnvelopeForWriting(env);
-
- while (serializer.writeNextChunk(channel));
- }
- }
-
- private void readEnvelopes(ReadableByteChannel channel, float probabilityForNoBufferCurrently) throws IOException {
-
- final Random rand = new Random(RANDOM_SEED);
-
- final EnvelopeReader reader = new EnvelopeReader(new OneForAllBroker(BUFFER_SIZE, probabilityForNoBufferCurrently));
-
- final int NUM_ENVS = BUFFER_SIZES.length;
-
- for (int i = 0; i < NUM_ENVS; i++) {
- int expectedSeqNum = Math.abs(rand.nextInt());
- JobID expectedJid = new JobID(rand.nextLong(), rand.nextLong());
- ChannelID expectedSid = new ChannelID(rand.nextLong(), rand.nextLong());
-
- // read the next envelope
- while (reader.readNextChunk(channel) != EnvelopeReader.DeserializationState.COMPLETE);
- Envelope env = reader.getFullyDeserializedTransferEnvelope();
-
- // check the basic fields from the header
- Assert.assertEquals(expectedSeqNum, env.getSequenceNumber());
- Assert.assertEquals(expectedJid, env.getJobID());
- Assert.assertEquals(expectedSid, env.getSource());
-
- // check the events
- List<? extends AbstractEvent> events = env.deserializeEvents();
- Assert.assertEquals(EVENT_LISTS[i].length, events.size());
-
- for (int n = 0; n < EVENT_LISTS[i].length; n++) {
- AbstractEvent expectedEvent = EVENT_LISTS[i][n];
- AbstractEvent actualEvent = events.get(n);
-
- Assert.assertEquals(expectedEvent.getClass(), actualEvent.getClass());
- Assert.assertEquals(expectedEvent, actualEvent);
- }
-
- // check the buffer
- Buffer buf = env.getBuffer();
- if (buf == null) {
- Assert.assertTrue(BUFFER_SIZES[i] == 0);
- } else {
- Assert.assertEquals(BUFFER_SIZES[i], buf.size());
- for (int k = 0; k < BUFFER_SIZES[i]; k++) {
- Assert.assertEquals(BUFFER_CONTENT, buf.getMemorySegment().get(k));
- }
- }
-
- reader.reset();
- }
-
- }
-
-
- public static final class TestEvent1 extends AbstractEvent {
-
- private long id;
-
- public TestEvent1() {}
-
- public TestEvent1(long id) {
- this.id = id;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(id);
- }
-
- @Override
- public void read(DataInput in) throws IOException {
- id = in.readLong();
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == TestEvent1.class) {
- return ((TestEvent1) obj).id == this.id;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return ((int) id) ^ ((int) (id >>> 32));
- }
-
- @Override
- public String toString() {
- return "TestEvent1 (" + id + ")";
- }
- }
-
- public static final class TestEvent2 extends AbstractEvent {
-
- private long id;
-
- public TestEvent2() {}
-
- public TestEvent2(long id) {
- this.id = id;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(id);
- }
-
- @Override
- public void read(DataInput in) throws IOException {
- id = in.readLong();
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == TestEvent2.class) {
- return ((TestEvent2) obj).id == this.id;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return ((int) id) ^ ((int) (id >>> 32));
- }
-
- @Override
- public String toString() {
- return "TestEvent2 (" + id + ")";
- }
- }
-
- private static final class ChunkedWriteableChannel implements WritableByteChannel {
-
- private final WritableByteChannel delegate;
-
- private final Random rnd;
-
- private ChunkedWriteableChannel(WritableByteChannel delegate) {
- this.delegate = delegate;
- this.rnd = new Random();
- }
-
- @Override
- public boolean isOpen() {
- return this.delegate.isOpen();
- }
-
- @Override
- public void close() throws IOException {
- this.delegate.close();
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- final int available = src.remaining();
- final int oldLimit = src.limit();
-
- int toWrite = rnd.nextInt(available) + 1;
- toWrite = Math.min(Math.max(toWrite, 8), available);
-
- src.limit(src.position() + toWrite);
-
- int written = this.delegate.write(src);
-
- src.limit(oldLimit);
-
- return written;
- }
- }
-
- private static final class ChunkedReadableChannel implements ReadableByteChannel {
-
- private final ReadableByteChannel delegate;
-
- private final Random rnd;
-
- private ChunkedReadableChannel(ReadableByteChannel delegate) {
- this.delegate = delegate;
- this.rnd = new Random();
- }
-
- @Override
- public boolean isOpen() {
- return this.delegate.isOpen();
- }
-
- @Override
- public void close() throws IOException {
- this.delegate.close();
- }
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
- final int available = dst.remaining();
- final int oldLimit = dst.limit();
-
- int toRead = rnd.nextInt(available) + 1;
- toRead = Math.min(Math.max(toRead, 8), available);
-
- dst.limit(dst.position() + toRead);
-
- int read = this.delegate.read(dst);
-
- dst.limit(oldLimit);
-
- return read;
- }
- }
-
- private static final class OneForAllBroker implements BufferProviderBroker {
-
- private final TestBufferProvider provider;
-
- private OneForAllBroker(int sizeOfMemorySegments) {
- this.provider = new TestBufferProvider(sizeOfMemorySegments);
- }
-
- private OneForAllBroker(int sizeOfMemorySegments, float probabilityForNoBufferCurrently) {
- this.provider = new TestBufferProvider(sizeOfMemorySegments, probabilityForNoBufferCurrently);
- }
-
- @Override
- public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) {
- return this.provider;
- }
- }
-}