You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/03/06 05:18:29 UTC
svn commit: r634153 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/
src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/net/
Author: rangadi
Date: Wed Mar 5 20:18:26 2008
New Revision: 634153
URL: http://svn.apache.org/viewvc?rev=634153&view=rev
Log:
HADOOP-2346. Utilities to support timeout while writing to sockets.
DFSClient and DataNode sockets have 10min write timeout. (rangadi)
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java
Removed:
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java
hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 5 20:18:26 2008
@@ -42,6 +42,9 @@
config params to map records to different output files.
(Runping Qi via cdouglas)
+ HADOOP-2346. Utilities to support timeout while writing to sockets.
+ DFSClient and DataNode sockets have 10min write timeout. (rangadi)
+
IMPROVEMENTS
HADOOP-2655. Copy on write for data and metadata files in the
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockCrcUpgrade.java Wed Mar 5 20:18:26 2008
@@ -36,6 +36,7 @@
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.zip.CRC32;
import java.util.concurrent.*;
@@ -181,7 +182,7 @@
Socket dnSock = null;
try {
InetSocketAddress dnAddr = NetUtils.createSocketAddr(dn.getName());
- dnSock = new Socket();
+ dnSock = SocketChannel.open().socket();
dnSock.connect(dnAddr, FSConstants.READ_TIMEOUT);
dnSock.setSoTimeout(FSConstants.READ_TIMEOUT);
DFSClient.BlockReader reader = DFSClient.BlockReader.newBlockReader
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Mar 5 20:18:26 2008
@@ -862,22 +862,27 @@
throws IOException {
// in and out will be closed when sock is closed (by the caller)
DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(sock.getOutputStream()));
-
- //write the header.
- out.writeShort( DATA_TRANSFER_VERSION );
- out.write( OP_READ_BLOCK );
- out.writeLong( blockId );
- out.writeLong( startOffset );
- out.writeLong( len );
- out.flush();
+ new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT)));
+ try {
+ //write the header.
+ out.writeShort( DATA_TRANSFER_VERSION );
+ out.write( OP_READ_BLOCK );
+ out.writeLong( blockId );
+ out.writeLong( startOffset );
+ out.writeLong( len );
+ out.flush();
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
//
// Get bytes in block, set streams
//
DataInputStream in = new DataInputStream(
- new BufferedInputStream(sock.getInputStream(), bufferSize));
+ new BufferedInputStream(NetUtils.getInputStream(sock),
+ bufferSize));
if ( in.readShort() != OP_STATUS_SUCCESS ) {
throw new IOException("Got error in response to OP_READ_BLOCK " +
@@ -921,7 +926,7 @@
*/
void checksumOk(Socket sock) {
try {
- OutputStream out = sock.getOutputStream();
+ OutputStream out = NetUtils.getOutputStream(sock, WRITE_TIMEOUT);
byte buf[] = { (OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
(OP_STATUS_CHECKSUM_OK) & 0xff };
out.write(buf);
@@ -2065,12 +2070,16 @@
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
LOG.debug("Send buf size " + s.getSendBufferSize());
+ long writeTimeout = WRITE_TIMEOUT_EXTENSION * nodes.length +
+ WRITE_TIMEOUT;
//
// Xmit header info to datanode
//
- DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream(), buffersize));
- blockReplyStream = new DataInputStream(s.getInputStream());
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout),
+ buffersize));
+ blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
out.writeShort( DATA_TRANSFER_VERSION );
out.write( OP_WRITE_BLOCK );
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Mar 5 20:18:26 2008
@@ -26,6 +26,8 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputStream;
+import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -38,6 +40,8 @@
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.security.NoSuchAlgorithmException;
@@ -249,7 +253,8 @@
// find free port
- ServerSocket ss = new ServerSocket(tmpPort, 0, socAddr.getAddress());
+ ServerSocket ss = ServerSocketChannel.open().socket();
+ Server.bind(ss, socAddr, 0);
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
tmpPort = ss.getLocalPort();
@@ -812,7 +817,7 @@
private static void receiveResponse(Socket s, int numTargets) throws IOException {
// check the response
DataInputStream reply = new DataInputStream(new BufferedInputStream(
- s.getInputStream(), BUFFER_SIZE));
+ new SocketInputStream(s), BUFFER_SIZE));
try {
for (int i = 0; i < numTargets; i++) {
short opStatus = reply.readShort();
@@ -828,7 +833,8 @@
/* utility function for sending a respose */
private static void sendResponse(Socket s, short opStatus) throws IOException {
- DataOutputStream reply = new DataOutputStream(s.getOutputStream());
+ DataOutputStream reply =
+ new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
try {
reply.writeShort(opStatus);
reply.flush();
@@ -926,7 +932,7 @@
DataInputStream in=null;
try {
in = new DataInputStream(
- new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
+ new BufferedInputStream(new SocketInputStream(s), BUFFER_SIZE));
short version = in.readShort();
if ( version != DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
@@ -994,7 +1000,8 @@
// send the block
DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(s.getOutputStream(), SMALL_BUFFER_SIZE));
+ new BufferedOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT),
+ SMALL_BUFFER_SIZE));
BlockSender blockSender = null;
try {
try {
@@ -1084,7 +1091,8 @@
s.getInetAddress().toString(), isRecovery, client);
// get a connection back to the previous target
- replyOut = new DataOutputStream(s.getOutputStream());
+ replyOut = new DataOutputStream(
+ new SocketOutputStream(s, WRITE_TIMEOUT));
//
// Open network conn to backup machine, if
@@ -1095,16 +1103,19 @@
// Connect to backup machine
mirrorNode = targets[0].getName();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
- mirrorSock = new Socket();
+ mirrorSock = SocketChannel.open().socket();
try {
int timeoutValue = numTargets * socketTimeout;
+ int writeTimeout = WRITE_TIMEOUT +
+ (WRITE_TIMEOUT_EXTENSION * numTargets);
mirrorSock.connect(mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
- new BufferedOutputStream(mirrorSock.getOutputStream(),
- BUFFER_SIZE));
- mirrorIn = new DataInputStream(mirrorSock.getInputStream());
+ new BufferedOutputStream(
+ new SocketOutputStream(mirrorSock, writeTimeout),
+ BUFFER_SIZE));
+ mirrorIn = new DataInputStream(new SocketInputStream(mirrorSock));
// Write header: Copied from DFSClient.java!
mirrorOut.writeShort( DATA_TRANSFER_VERSION );
@@ -1214,7 +1225,7 @@
byte [] buf = new byte[(int)fileSize];
IOUtils.readFully(checksumIn, buf, 0, buf.length);
- out = new DataOutputStream(s.getOutputStream());
+ out = new DataOutputStream(new SocketOutputStream(s, WRITE_TIMEOUT));
out.writeByte(OP_STATUS_SUCCESS);
out.writeInt(buf.length);
@@ -1224,6 +1235,7 @@
out.writeInt(0);
} finally {
xceiverCount.decr();
+ IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
}
}
@@ -1255,12 +1267,13 @@
// get the output stream to the target
InetSocketAddress targetAddr = NetUtils.createSocketAddr(target.getName());
- targetSock = new Socket();
+ targetSock = SocketChannel.open().socket();
targetSock.connect(targetAddr, socketTimeout);
targetSock.setSoTimeout(socketTimeout);
targetOut = new DataOutputStream(new BufferedOutputStream(
- targetSock.getOutputStream(), SMALL_BUFFER_SIZE));
+ new SocketOutputStream(targetSock, WRITE_TIMEOUT),
+ SMALL_BUFFER_SIZE));
/* send request to the target */
// fist write header info
@@ -2541,12 +2554,14 @@
try {
InetSocketAddress curTarget =
NetUtils.createSocketAddr(targets[0].getName());
- sock = new Socket();
+ sock = SocketChannel.open().socket();
sock.connect(curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
+ long writeTimeout = WRITE_TIMEOUT +
+ WRITE_TIMEOUT_EXTENSION * (targets.length-1);
out = new DataOutputStream(new BufferedOutputStream(
- sock.getOutputStream(), SMALL_BUFFER_SIZE));
+ new SocketOutputStream(sock, writeTimeout), SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, -1, false, false, false);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Mar 5 20:18:26 2008
@@ -125,6 +125,8 @@
public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
public static int READ_TIMEOUT = 60 * 1000;
+ public static int WRITE_TIMEOUT = 10 * 60 * 1000;
+ public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
// We need to limit the length and depth of a path in the filesystem. HADOOP-438
// Currently we set the maximum length to 8k characters and the maximum depth to 1k.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Mar 5 20:18:26 2008
@@ -185,7 +185,7 @@
socket.setSoTimeout(timeout);
this.in = new DataInputStream
(new BufferedInputStream
- (new FilterInputStream(socket.getInputStream()) {
+ (new FilterInputStream(NetUtils.getInputStream(socket)) {
public int read(byte[] buf, int off, int len) throws IOException {
int value = super.read(buf, off, len);
if (readingCall != null) {
@@ -196,7 +196,7 @@
}));
this.out = new DataOutputStream
(new BufferedOutputStream
- (new FilterOutputStream(socket.getOutputStream()) {
+ (new FilterOutputStream(NetUtils.getOutputStream(socket)) {
public void write(byte[] buf, int o, int len) throws IOException {
out.write(buf, o, len);
if (writingCall != null) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Server.java Wed Mar 5 20:18:26 2008
@@ -176,8 +176,8 @@
* @throws UnknownHostException if the address isn't a valid host name
* @throws IOException other random errors from bind
*/
- static void bind(ServerSocket socket, InetSocketAddress address,
- int backlog) throws IOException {
+ public static void bind(ServerSocket socket, InetSocketAddress address,
+ int backlog) throws IOException {
try {
socket.bind(address, backlog);
} catch (BindException e) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/NetUtils.java Wed Mar 5 20:18:26 2008
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.net;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.net.URI;
import java.util.Map.Entry;
import java.util.*;
@@ -223,5 +227,106 @@
}
return l;
}
+ }
+
+ /**
+ * Same as getInputStream(socket, socket.getSoTimeout()).<br><br>
+ *
+ * From documentation for {@link #getInputStream(Socket, long)}:<br>
+ * Returns InputStream for the socket. If the socket has an associated
+ * SocketChannel then it returns a
+ * {@link SocketInputStream} with the given timeout. If the socket does not
+ * have a channel, {@link Socket#getInputStream()} is returned. In the later
+ * case, the timeout argument is ignored and the timeout set with
+ * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
+ *
+ * Any socket created using socket factories returned by {@link #NetUtils},
+ * must use this interface instead of {@link Socket#getInputStream()}.
+ *
+ * @see #getInputStream(Socket, long)
+ *
+ * @param socket
+ * @return InputStream for reading from the socket.
+ * @throws IOException
+ */
+ public static InputStream getInputStream(Socket socket)
+ throws IOException {
+ return getInputStream(socket, socket.getSoTimeout());
+ }
+
+ /**
+ * Returns InputStream for the socket. If the socket has an associated
+ * SocketChannel then it returns a
+ * {@link SocketInputStream} with the given timeout. If the socket does not
+ * have a channel, {@link Socket#getInputStream()} is returned. In the later
+ * case, the timeout argument is ignored and the timeout set with
+ * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
+ *
+ * Any socket created using socket factories returned by {@link #NetUtils},
+ * must use this interface instead of {@link Socket#getInputStream()}.
+ *
+ * @see Socket#getChannel()
+ *
+ * @param socket
+ * @param timeout timeout in milliseconds. This may not always apply. zero
+ * for waiting as long as necessary.
+ * @return InputStream for reading from the socket.
+ * @throws IOException
+ */
+ public static InputStream getInputStream(Socket socket, long timeout)
+ throws IOException {
+ return (socket.getChannel() == null) ?
+ socket.getInputStream() : new SocketInputStream(socket, timeout);
+ }
+
+ /**
+ * Same as getOutputStream(socket, 0). Timeout of zero implies write will
+ * wait until data is available.<br><br>
+ *
+ * From documentation for {@link #getOutputStream(Socket, long)} : <br>
+ * Returns OutputStream for the socket. If the socket has an associated
+ * SocketChannel then it returns a
+ * {@link SocketOutputStream} with the given timeout. If the socket does not
+ * have a channel, {@link Socket#getOutputStream()} is returned. In the later
+ * case, the timeout argument is ignored and the write will wait until
+ * data is available.<br><br>
+ *
+ * Any socket created using socket factories returned by {@link #NetUtils},
+ * must use this interface instead of {@link Socket#getOutputStream()}.
+ *
+ * @see #getOutputStream(Socket, long)
+ *
+ * @param socket
+ * @return OutputStream for writing to the socket.
+ * @throws IOException
+ */
+ public static OutputStream getOutputStream(Socket socket)
+ throws IOException {
+ return getOutputStream(socket, 0);
+ }
+
+ /**
+ * Returns OutputStream for the socket. If the socket has an associated
+ * SocketChannel then it returns a
+ * {@link SocketOutputStream} with the given timeout. If the socket does not
+ * have a channel, {@link Socket#getOutputStream()} is returned. In the later
+ * case, the timeout argument is ignored and the write will wait until
+ * data is available.<br><br>
+ *
+ * Any socket created using socket factories returned by {@link #NetUtils},
+ * must use this interface instead of {@link Socket#getOutputStream()}.
+ *
+ * @see Socket#getChannel()
+ *
+ * @param socket
+ * @param timeout timeout in milliseconds. This may not always apply. zero
+ * for waiting as long as necessary.
+ * @return OutputStream for writing to the socket.
+ * @throws IOException
+ */
+ public static OutputStream getOutputStream(Socket socket, long timeout)
+ throws IOException {
+ return (socket.getChannel() == null) ?
+ socket.getOutputStream() : new SocketOutputStream(socket, timeout);
}
}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketIOWithTimeout.java Wed Mar 5 20:18:26 2008
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.Pipe;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This supports input and output streams for a socket channels.
+ * These streams can have a timeout.
+ */
+abstract class SocketIOWithTimeout {
+ // This is intentionally package private.
+
+ static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);
+
+ private SelectableChannel channel;
+ private long timeout;
+ private boolean closed = false;
+
+ private static SelectorPool selector = new SelectorPool();
+
+ /* A timeout value of 0 implies wait for ever.
+ * We should have a value of timeout that implies zero wait.. i.e.
+ * read or write returns immediately.
+ *
+ * This will set channel to non-blocking.
+ */
+ SocketIOWithTimeout(SelectableChannel channel, long timeout)
+ throws IOException {
+ checkChannelValidity(channel);
+
+ this.channel = channel;
+ this.timeout = timeout;
+ // Set non-blocking
+ channel.configureBlocking(false);
+ }
+
+ void close() {
+ closed = true;
+ }
+
+ boolean isOpen() {
+ return !closed && channel.isOpen();
+ }
+
+ SelectableChannel getChannel() {
+ return channel;
+ }
+
+ /**
+ * Utility function to check if channel is ok.
+ * Mainly to throw IOException instead of runtime exception
+ * in case of mismatch. This mismatch can occur for many runtime
+ * reasons.
+ */
+ static void checkChannelValidity(Object channel) throws IOException {
+ if (channel == null) {
+ /* Most common reason is that original socket does not have a channel.
+ * So making this an IOException rather than a RuntimeException.
+ */
+ throw new IOException("Channel is null. Check " +
+ "how the channel or socket is created.");
+ }
+
+ if (!(channel instanceof SelectableChannel)) {
+ throw new IOException("Channel should be a SelectableChannel");
+ }
+ }
+
+ /**
+ * Performs actual IO operations. This is not expected to block.
+ *
+ * @param buf
+ * @return number of bytes (or some equivalent). 0 implies underlying
+ * channel is drained completely. We will wait if more IO is
+ * required.
+ * @throws IOException
+ */
+ abstract int performIO(ByteBuffer buf) throws IOException;
+
+ /**
+ * Performs one IO and returns number of bytes read or written.
+ * It waits up to the specified timeout. If the channel is
+ * not read before the timeout, SocketTimeoutException is thrown.
+ *
+ * @param buf buffer for IO
+ * @param ops Selection Ops used for waiting. Suggested values:
+ * SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
+ * writing.
+ *
+ * @return number of bytes read or written. negative implies end of stream.
+ * @throws IOException
+ */
+ int doIO(ByteBuffer buf, int ops) throws IOException {
+
+ /* For now only one thread is allowed. If user want to read or write
+ * from multiple threads, multiple streams could be created. In that
+ * case multiple threads work as well as underlying channel supports it.
+ */
+ if (!buf.hasRemaining()) {
+ throw new IllegalArgumentException("Buffer has no data left.");
+ //or should we just return 0?
+ }
+
+ while (buf.hasRemaining()) {
+ if (closed) {
+ return -1;
+ }
+
+ try {
+ int n = performIO(buf);
+ if (n != 0) {
+ // successful io or an error.
+ return n;
+ }
+ } catch (IOException e) {
+ if (!channel.isOpen()) {
+ closed = true;
+ }
+ throw e;
+ }
+
+ //now wait for socket to be ready.
+ int count = 0;
+ try {
+ count = selector.select(channel, ops, timeout);
+ } catch (IOException e) { //unexpected IOException.
+ closed = true;
+ throw e;
+ }
+
+ if (count == 0) {
+ String channelStr = "Unknown Channel";
+ if (channel instanceof SocketChannel) {
+ Socket sock = ((SocketChannel)channel).socket();
+ SocketAddress remote = sock.getRemoteSocketAddress();
+ SocketAddress local = sock.getLocalSocketAddress();
+ channelStr = (remote == null ? "Unknown Addr" : remote) +
+ " (local: " +
+ (local == null ? "Unknown Addr" : local) + ")";
+ } else if (channel instanceof Pipe.SinkChannel ||
+ channel instanceof Pipe.SourceChannel) {
+ channelStr = "pipe";
+ } else if (channel instanceof DatagramChannel) {
+ channelStr = "datagram channel";
+ }
+
+ String waitingFor = ""+ops;
+ if (ops == SelectionKey.OP_READ) {
+ waitingFor = "read";
+ } else if (ops == SelectionKey.OP_WRITE) {
+ waitingFor = "write";
+ }
+
+ throw new SocketTimeoutException(timeout + " millis timeout while " +
+ "waiting for " + channelStr +
+ " to be ready for " + waitingFor);
+ }
+ // otherwise the socket should be ready for io.
+ }
+
+ return 0; // does not reach here.
+ }
+
+ /**
+ * This maintains a pool of selectors. These selectors are closed
+ * once they are idle (unused) for a few seconds.
+ */
+ private static class SelectorPool {
+
+ private static class SelectorInfo {
+ Selector selector;
+ long lastActivityTime;
+ LinkedList<SelectorInfo> queue;
+
+ void close() {
+ if (selector != null) {
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception while closing selector : " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+
+ private static class ProviderInfo {
+ SelectorProvider provider;
+ LinkedList<SelectorInfo> queue; // lifo
+ ProviderInfo next;
+ }
+
+ private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
+
+ private ProviderInfo providerList = null;
+
+ /**
+ * Waits on the channel with the given timeout using one of the
+ * cached selectors. It also removes any cached selectors that are
+ * idle for a few seconds.
+ *
+ * @param channel
+ * @param ops
+ * @param timeout
+ * @return
+ * @throws IOException
+ */
+ int select(SelectableChannel channel, int ops, long timeout)
+ throws IOException {
+
+ SelectorInfo info = get(channel);
+
+ SelectionKey key = null;
+ int ret = -1;
+
+ try {
+ key = channel.register(info.selector, ops);
+ ret = info.selector.select(timeout);
+ } finally {
+ if (key != null) {
+ key.cancel();
+ }
+
+ //clear the canceled key.
+ try {
+ info.selector.selectNow();
+ } catch (IOException e) {
+ LOG.info("Unexpected Exception while clearing selector : " +
+ StringUtils.stringifyException(e));
+ // don't put the selector back.
+ info.close();
+ return ret;
+ }
+
+ release(info);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Takes one selector from end of LRU list of free selectors.
+ * If there are no selectors awailable, it creates a new selector.
+ * Also invokes trimIdleSelectors().
+ *
+ * @param channel
+ * @return
+ * @throws IOException
+ */
+ private synchronized SelectorInfo get(SelectableChannel channel)
+ throws IOException {
+ SelectorInfo selInfo = null;
+
+ SelectorProvider provider = channel.provider();
+
+ // pick the list : rarely there is more than one provider in use.
+ ProviderInfo pList = providerList;
+ while (pList != null && pList.provider != provider) {
+ pList = pList.next;
+ }
+ if (pList == null) {
+ //LOG.info("Creating new ProviderInfo : " + provider.toString());
+ pList = new ProviderInfo();
+ pList.provider = provider;
+ pList.queue = new LinkedList<SelectorInfo>();
+ pList.next = providerList;
+ providerList = pList;
+ }
+
+ LinkedList<SelectorInfo> queue = pList.queue;
+
+ if (queue.isEmpty()) {
+ Selector selector = provider.openSelector();
+ selInfo = new SelectorInfo();
+ selInfo.selector = selector;
+ selInfo.queue = queue;
+ } else {
+ selInfo = queue.removeLast();
+ }
+
+ trimIdleSelectors(System.currentTimeMillis());
+ return selInfo;
+ }
+
+ /**
+ * puts selector back at the end of LRU list of free selectos.
+ * Also invokes trimIdleSelectors().
+ *
+ * @param info
+ */
+ private synchronized void release(SelectorInfo info) {
+ long now = System.currentTimeMillis();
+ trimIdleSelectors(now);
+ info.lastActivityTime = now;
+ info.queue.addLast(info);
+ }
+
+ /**
+ * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
+ * traverse the whole list, just over the one that have crossed
+ * the timeout.
+ */
+ private void trimIdleSelectors(long now) {
+ long cutoff = now - IDLE_TIMEOUT;
+
+ for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
+ if (pList.queue.isEmpty()) {
+ continue;
+ }
+ for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
+ SelectorInfo info = it.next();
+ if (info.lastActivityTime > cutoff) {
+ break;
+ }
+ it.remove();
+ info.close();
+ }
+ }
+ }
+ }
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketInputStream.java Wed Mar 5 20:18:26 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+
+/**
+ * This implements an input stream that can have a timeout while reading.
+ * This sets non-blocking flag on the socket channel.
+ * So after create this object, read() on
+ * {@link Socket#getInputStream()} and write() on
+ * {@link Socket#getOutputStream()} for the associated socket will throw
+ * IllegalBlockingModeException.
+ * Please use {@link SocketOutputStream} for writing.
+ */
+public class SocketInputStream extends InputStream
+ implements ReadableByteChannel {
+
+ private SocketIOWithTimeout reader;
+
+ private static class Reader extends SocketIOWithTimeout {
+ ReadableByteChannel channel;
+
+ Reader(ReadableByteChannel channel, long timeout) throws IOException {
+ super((SelectableChannel)channel, timeout);
+ this.channel = channel;
+ }
+
+ int performIO(ByteBuffer buf) throws IOException {
+ return channel.read(buf);
+ }
+ }
+
+ /**
+ * Create a new input stream with the given timeout. If the timeout
+ * is zero, it will be treated as infinite timeout. The socket's
+ * channel will be configured to be non-blocking.
+ *
+ * @param channel
+ * Channel for reading, should also be a {@link SelectableChannel}.
+ * The channel will be configured to be non-blocking.
+ * @param timeout timeout in milliseconds. must not be negative.
+ * @throws IOException
+ */
+ public SocketInputStream(ReadableByteChannel channel, long timeout)
+ throws IOException {
+ SocketIOWithTimeout.checkChannelValidity(channel);
+ reader = new Reader(channel, timeout);
+ }
+
+ /**
+ * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
+ *
+ * Create a new input stream with the given timeout. If the timeout
+ * is zero, it will be treated as infinite timeout. The socket's
+ * channel will be configured to be non-blocking.
+ *
+ * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
+ *
+ * @param socket should have a channel associated with it.
+ * @param timeout timeout timeout in milliseconds. must not be negative.
+ * @throws IOException
+ */
+ public SocketInputStream(Socket socket, long timeout)
+ throws IOException {
+ this(socket.getChannel(), timeout);
+ }
+
+ /**
+ * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
+ * :<br><br>
+ *
+ * Create a new input stream with the given timeout. If the timeout
+ * is zero, it will be treated as infinite timeout. The socket's
+ * channel will be configured to be non-blocking.
+ * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
+ *
+ * @param socket should have a channel associated with it.
+ * @throws IOException
+ */
+ public SocketInputStream(Socket socket) throws IOException {
+ this(socket.getChannel(), socket.getSoTimeout());
+ }
+
+ @Override
+ public int read() throws IOException {
+ /* Allocation can be removed if required.
+ * probably no need to optimize or encourage single byte read.
+ */
+ byte[] buf = new byte[1];
+ if (read(buf, 0, 1) > 0) {
+ return (byte)buf[0];
+ }
+ throw new IOException("Could not read from stream");
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ //ReadableByteChannel interface
+
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ public int read(ByteBuffer dst) throws IOException {
+ return reader.doIO(dst, SelectionKey.OP_READ);
+ }
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/SocketOutputStream.java Wed Mar 5 20:18:26 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This implements an output stream that can have a timeout while writing.
+ * This sets non-blocking flag on the socket channel.
+ * So after creating this object , read() on
+ * {@link Socket#getInputStream()} and write() on
+ * {@link Socket#getOutputStream()} on the associated socket will throw
+ * llegalBlockingModeException.
+ * Please use {@link SocketInputStream} for reading.
+ */
+public class SocketOutputStream extends OutputStream
+ implements WritableByteChannel {
+
+ private SocketIOWithTimeout writer;
+
+ private static class Writer extends SocketIOWithTimeout {
+ WritableByteChannel channel;
+
+ Writer(WritableByteChannel channel, long timeout) throws IOException {
+ super((SelectableChannel)channel, timeout);
+ this.channel = channel;
+ }
+
+ int performIO(ByteBuffer buf) throws IOException {
+ return channel.write(buf);
+ }
+ }
+
+ /**
+ * Create a new ouput stream with the given timeout. If the timeout
+ * is zero, it will be treated as infinite timeout. The socket's
+ * channel will be configured to be non-blocking.
+ *
+ * @param channel
+ * Channel for writing, should also be a {@link SelectableChannel}.
+ * The channel will be configured to be non-blocking.
+ * @param timeout timeout in milliseconds. must not be negative.
+ * @throws IOException
+ */
+ public SocketOutputStream(WritableByteChannel channel, long timeout)
+ throws IOException {
+ SocketIOWithTimeout.checkChannelValidity(channel);
+ writer = new Writer(channel, timeout);
+ }
+
+ /**
+ * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
+ *
+ * Create a new ouput stream with the given timeout. If the timeout
+ * is zero, it will be treated as infinite timeout. The socket's
+ * channel will be configured to be non-blocking.
+ *
+ * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
+ *
+ * @param socket should have a channel associated with it.
+ * @param timeout timeout timeout in milliseconds. must not be negative.
+ * @throws IOException
+ */
+ public SocketOutputStream(Socket socket, long timeout)
+ throws IOException {
+ this(socket.getChannel(), timeout);
+ }
+
+ public void write(int b) throws IOException {
+ /* If we need to, we can optimize this allocation.
+ * probably no need to optimize or encourage single byte writes.
+ */
+ byte[] buf = new byte[1];
+ buf[0] = (byte)b;
+ write(buf, 0, 1);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ while (buf.hasRemaining()) {
+ try {
+ if (write(buf) < 0) {
+ throw new IOException("The stream is closed");
+ }
+ } catch (IOException e) {
+ /* Unlike read, write can not inform user of partial writes.
+ * So will close this if there was a partial write.
+ */
+ if (buf.capacity() > buf.remaining()) {
+ writer.close();
+ }
+ throw e;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ //WritableByteChannle interface
+
+ public boolean isOpen() {
+ return writer.isOpen();
+ }
+
+ public int write(ByteBuffer src) throws IOException {
+ return writer.doIO(src, SelectionKey.OP_WRITE);
+ }
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java?rev=634153&r1=634152&r2=634153&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java Wed Mar 5 20:18:26 2008
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
@@ -39,7 +40,22 @@
/* @inheritDoc */
@Override
public Socket createSocket() throws IOException {
- return new Socket();
+ /*
+ * NOTE: This returns an NIO socket so that it has an associated
+ * SocketChannel. As of now, this unfortunately makes streams returned
+ * by Socket.getInputStream() and Socket.getOutputStream() unusable
+ * (because a blocking read on input stream blocks write on output stream
+ * and vice versa).
+ *
+ * So users of these socket factories should use
+ * NetUtils.getInputStream(socket) and
+ * NetUtils.getOutputStream(socket) instead.
+ *
+ * A solution for hiding from this from user is to write a
+ * 'FilterSocket' on the lines of FilterInputStream and extend it by
+ * overriding getInputStream() and getOutputStream().
+ */
+ return SocketChannel.open().socket();
}
/* @inheritDoc */
Added: hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java?rev=634153&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java Wed Mar 5 20:18:26 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.channels.Pipe;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests timout out from SocketInputStream and
+ * SocketOutputStream using pipes.
+ *
+ * Normal read and write using these streams are tested by pretty much
+ * every DFS unit test.
+ */
+public class TestSocketIOWithTimeout extends TestCase {
+
+ Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
+
+ private static int TIMEOUT = 1*1000;
+ private static String TEST_STRING = "1234567890";
+
+ private void doIO(InputStream in, OutputStream out) throws IOException {
+ /* Keep on writing or reading until we get SocketTimeoutException.
+ * It expects this exception to occur within 100 millis of TIMEOUT.
+ */
+ byte buf[] = new byte[4192];
+
+ while (true) {
+ long start = System.currentTimeMillis();
+ try {
+ if (in != null) {
+ in.read(buf);
+ } else {
+ out.write(buf);
+ }
+ } catch (SocketTimeoutException e) {
+ long diff = System.currentTimeMillis() - start;
+ LOG.info("Got SocketTimeoutException as expected after " +
+ diff + " millis : " + e.getMessage());
+ assertTrue(Math.abs(TIMEOUT - diff) <= 100);
+ break;
+ }
+ }
+ }
+
+ public void testSocketIOWithTimeout() throws IOException {
+
+ // first open pipe:
+ Pipe pipe = Pipe.open();
+ Pipe.SourceChannel source = pipe.source();
+ Pipe.SinkChannel sink = pipe.sink();
+
+ try {
+ InputStream in = new SocketInputStream(source, TIMEOUT);
+ OutputStream out = new SocketOutputStream(sink, TIMEOUT);
+
+ byte[] writeBytes = TEST_STRING.getBytes();
+ byte[] readBytes = new byte[writeBytes.length];
+
+ out.write(writeBytes);
+ doIO(null, out);
+
+ in.read(readBytes);
+ assertTrue(Arrays.equals(writeBytes, readBytes));
+ doIO(in, null);
+ } finally {
+ if (source != null) {
+ source.close();
+ }
+ if (sink != null) {
+ sink.close();
+ }
+ }
+ }
+}