You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/05/03 23:58:08 UTC

svn commit: r1333650 - /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java

Author: todd
Date: Thu May  3 21:58:08 2012
New Revision: 1333650

URL: http://svn.apache.org/viewvc?rev=1333650&view=rev
Log:
Amend previous commit of HADOOP-8350 (missed new SocketInputWrapper file)

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java?rev=1333650&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketInputWrapper.java Thu May  3 21:58:08 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.io.FilterInputStream;
+
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A wrapper stream around a socket which allows setting of its timeout. If the
+ * socket has a channel, this uses non-blocking IO via the package-private
+ * {@link SocketInputStream} implementation. Otherwise, timeouts are managed by
+ * setting the underlying socket timeout itself.
+ */
+@InterfaceAudience.LimitedPrivate("HDFS")
+@InterfaceStability.Unstable
+public class SocketInputWrapper extends FilterInputStream {
+  private final Socket socket;
+  private final boolean hasChannel;
+
+  SocketInputWrapper(Socket s, InputStream is) {
+    super(is);
+    this.socket = s;
+    this.hasChannel = s.getChannel() != null;
+    if (hasChannel) {
+      Preconditions.checkArgument(is instanceof SocketInputStream,
+          "Expected a SocketInputStream when there is a channel. " +
+          "Got: %s", is);
+    }
+  }
+
+  /**
+   * Set the timeout for reads from this stream.
+   * 
+   * Note: the behavior here can differ subtly depending on whether the
+   * underlying socket has an associated Channel. In particular, if there is no
+   * channel, then this call will affect the socket timeout for <em>all</em>
+   * readers of this socket. If there is a channel, then this call will affect
+   * the timeout only for <em>this</em> stream. As such, it is recommended to
+   * only create one {@link SocketInputWrapper} instance per socket.
+   * 
+   * @param timeoutMs
+   *          the new timeout, 0 for no timeout
+   * @throws SocketException
+   *           if the timeout cannot be set
+   */
+  public void setTimeout(long timeoutMs) throws SocketException {
+    if (hasChannel) {
+      ((SocketInputStream)in).setTimeout(timeoutMs);
+    } else {
+      socket.setSoTimeout((int)timeoutMs);
+    }
+  }
+
+  /**
+   * @return an underlying ReadableByteChannel implementation.
+   * @throws IllegalStateException if this socket does not have a channel
+   */
+  public ReadableByteChannel getReadableByteChannel() {
+    Preconditions.checkState(hasChannel,
+        "Socket %s does not have a channel",
+        this.socket);
+    return (SocketInputStream)in;
+  }
+}
\ No newline at end of file