You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/03/11 18:56:06 UTC

svn commit: r1299412 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/docs/src/documentation/content/xdocs/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ha/ src/main/java/org/...

Author: suresh
Date: Sun Mar 11 17:55:58 2012
New Revision: 1299412

URL: http://svn.apache.org/viewvc?rev=1299412&view=rev
Log:
HDFS-1623. Merging change r1296534 from trunk to 0.23

Added:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/
      - copied from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/BadFencingConfigurationException.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/BadFencingConfigurationException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverFailedException.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverFailedException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FenceMethod.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FenceMethod.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthCheckFailedException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/NodeFencer.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/NodeFencer.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ServiceFailedException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ShellCommandFencer.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ShellCommandFencer.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/SshFenceByTcpPort.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/SshFenceByTcpPort.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/StreamPumper.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/StreamPumper.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/
      - copied from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolPB.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolPB.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolTranslator.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolTranslator.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/
      - copied from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAAdmin.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestShellCommandFencer.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestShellCommandFencer.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestSshFenceByTcpPort.java
      - copied unchanged from r1296534, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestSshFenceByTcpPort.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/pom.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/service_level_auth.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/StandbyException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Sun Mar 11 17:55:58 2012
@@ -278,8 +278,12 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
     </Match>
-		<Match>
+    <Match>
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
     </Match>
+    <Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
+    </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/pom.xml?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/pom.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/pom.xml Sun Mar 11 17:55:58 2012
@@ -264,6 +264,38 @@
       <artifactId>json-simple</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>com.jcraft</groupId>
+      <artifactId>jsch</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.2</version>
+      <exclusions>
+        <exclusion>
+          <!-- otherwise seems to drag in junit 3.8.1 via jline -->
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.2</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/service_level_auth.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/service_level_auth.xml?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/service_level_auth.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/service_level_auth.xml Sun Mar 11 17:55:58 2012
@@ -138,6 +138,12 @@
             dfsadmin and mradmin commands to refresh the security policy in-effect.
             </td>
           </tr>
+          <tr>
+            <td><code>security.ha.service.protocol.acl</code></td>
+            <td>ACL for HAService protocol used by HAAdmin to manage the
+            active and stand-by states of namenode.
+            </td>
+          </tr>
         </table>
       </section>
       

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Sun Mar 11 17:55:58 2012
@@ -114,11 +114,12 @@ public class CommonConfigurationKeys ext
   public static final String 
   HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS =
       "security.refresh.user.mappings.protocol.acl";
+  public static final String 
+  SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
   
   public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
       "hadoop.security.token.service.use_ip";
   public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
       true;
-
 }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java Sun Mar 11 17:55:58 2012
@@ -172,6 +172,11 @@ public class CommonConfigurationKeysPubl
   /** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_KEY */
   public static final int     IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String  IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY =
+    "ipc.client.connect.max.retries.on.timeouts";
+  /** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY */
+  public static final int  IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 45;
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  IPC_CLIENT_TCPNODELAY_KEY =
     "ipc.client.tcpnodelay";
   /** Defalt value for IPC_CLIENT_TCPNODELAY_KEY */

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java Sun Mar 11 17:55:58 2012
@@ -27,28 +27,28 @@ import org.apache.hadoop.ipc.RPC;
  * event of failover, and always returns the same proxy object. 
  */
 @InterfaceStability.Evolving
-public class DefaultFailoverProxyProvider implements FailoverProxyProvider {
+public class DefaultFailoverProxyProvider<T> implements FailoverProxyProvider<T> {
   
-  private Object proxy;
-  private Class<?> iface;
+  private T proxy;
+  private Class<T> iface;
   
-  public DefaultFailoverProxyProvider(Class<?> iface, Object proxy) {
+  public DefaultFailoverProxyProvider(Class<T> iface, T proxy) {
     this.proxy = proxy;
     this.iface = iface;
   }
 
   @Override
-  public Class<?> getInterface() {
+  public Class<T> getInterface() {
     return iface;
   }
 
   @Override
-  public Object getProxy() {
+  public T getProxy() {
     return proxy;
   }
 
   @Override
-  public void performFailover(Object currentProxy) {
+  public void performFailover(T currentProxy) {
     // Nothing to do.
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java Sun Mar 11 17:55:58 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.classification.
  * {@link RetryPolicy}.
  */
 @InterfaceStability.Evolving
-public interface FailoverProxyProvider extends Closeable {
+public interface FailoverProxyProvider<T> extends Closeable {
 
   /**
    * Get the proxy object which should be used until the next failover event
@@ -37,7 +37,7 @@ public interface FailoverProxyProvider e
    * 
    * @return the proxy object to invoke methods upon
    */
-  public Object getProxy();
+  public T getProxy();
 
   /**
    * Called whenever the associated {@link RetryPolicy} determines that an error
@@ -46,7 +46,7 @@ public interface FailoverProxyProvider e
    * @param currentProxy the proxy object which was being used before this
    *        failover event
    */
-  public void performFailover(Object currentProxy);
+  public void performFailover(T currentProxy);
 
   /**
    * Return a reference to the interface this provider's proxy objects actually
@@ -58,5 +58,5 @@ public interface FailoverProxyProvider e
    * @return the interface implemented by the proxy objects returned by
    *         {@link FailoverProxyProvider#getProxy()}
    */
-  public Class<?> getInterface();
+  public Class<T> getInterface();
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java Sun Mar 11 17:55:58 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.io.retry;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,7 +27,9 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.util.ThreadUtil;
 import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
 
 class RetryInvocationHandler implements RpcInvocationHandler {
@@ -39,6 +40,7 @@ class RetryInvocationHandler implements 
    * The number of times the associated proxyProvider has ever been failed over.
    */
   private long proxyProviderFailoverCount = 0;
+  private volatile boolean hasMadeASuccessfulCall = false;
   
   private RetryPolicy defaultPolicy;
   private Map<String,RetryPolicy> methodNameToPolicyMap;
@@ -79,47 +81,82 @@ class RetryInvocationHandler implements 
         invocationAttemptFailoverCount = proxyProviderFailoverCount;
       }
       try {
-        return invokeMethod(method, args);
+        Object ret = invokeMethod(method, args);
+        hasMadeASuccessfulCall = true;
+        return ret;
       } catch (Exception e) {
         boolean isMethodIdempotent = proxyProvider.getInterface()
             .getMethod(method.getName(), method.getParameterTypes())
             .isAnnotationPresent(Idempotent.class);
         RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
             isMethodIdempotent);
-        if (action == RetryAction.FAIL) {
-          LOG.warn("Exception while invoking " + method.getName()
-                   + " of " + currentProxy.getClass() + ". Not retrying.", e);
-          if (!method.getReturnType().equals(Void.TYPE)) {
-            throw e; // non-void methods can't fail without an exception
+        if (action.action == RetryAction.RetryDecision.FAIL) {
+          if (action.reason != null) {
+            LOG.warn("Exception while invoking " + 
+                currentProxy.getClass() + "." + method.getName() +
+                ". Not retrying because " + action.reason, e);
           }
-          return null;
-        } else if (action == RetryAction.FAILOVER_AND_RETRY) {
-          LOG.warn("Exception while invoking " + method.getName()
-              + " of " + currentProxy.getClass()
-              + " after " + invocationFailoverCount + " fail over attempts."
-              + " Trying to fail over.", e);
-          // Make sure that concurrent failed method invocations only cause a
-          // single actual fail over.
-          synchronized (proxyProvider) {
-            if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
-              proxyProvider.performFailover(currentProxy);
-              proxyProviderFailoverCount++;
-              currentProxy = proxyProvider.getProxy();
+          throw e;
+        } else { // retry or failover
+          // avoid logging the failover if this is the first call on this
+          // proxy object, and we successfully achieve the failover without
+          // any flip-flopping
+          boolean worthLogging = 
+            !(invocationFailoverCount == 0 && !hasMadeASuccessfulCall);
+          worthLogging |= LOG.isDebugEnabled();
+          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY &&
+              worthLogging) {
+            String msg = "Exception while invoking " + method.getName()
+              + " of class " + currentProxy.getClass().getSimpleName();
+            if (invocationFailoverCount > 0) {
+              msg += " after " + invocationFailoverCount + " fail over attempts"; 
+            }
+            msg += ". Trying to fail over " + formatSleepMessage(action.delayMillis);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(msg, e);
             } else {
-              LOG.warn("A failover has occurred since the start of this method"
-                  + " invocation attempt.");
+              LOG.warn(msg);
+            }
+          } else {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Exception while invoking " + method.getName()
+                  + " of class " + currentProxy.getClass().getSimpleName() +
+                  ". Retrying " + formatSleepMessage(action.delayMillis), e);
             }
           }
-          invocationFailoverCount++;
-        }
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Exception while invoking " + method.getName()
-              + " of " + currentProxy.getClass() + ". Retrying.", e);
+          
+          if (action.delayMillis > 0) {
+            ThreadUtil.sleepAtLeastIgnoreInterrupts(action.delayMillis);
+          }
+          
+          if (action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
+            // Make sure that concurrent failed method invocations only cause a
+            // single actual fail over.
+            synchronized (proxyProvider) {
+              if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
+                proxyProvider.performFailover(currentProxy);
+                proxyProviderFailoverCount++;
+                currentProxy = proxyProvider.getProxy();
+              } else {
+                LOG.warn("A failover has occurred since the start of this method"
+                    + " invocation attempt.");
+              }
+            }
+            invocationFailoverCount++;
+          }
         }
       }
     }
   }
-
+  
+  private static String formatSleepMessage(long millis) {
+    if (millis > 0) {
+      return "after sleeping for " + millis + "ms.";
+    } else {
+      return "immediately.";
+    }
+  }
+  
   private Object invokeMethod(Method method, Object[] args) throws Throwable {
     try {
       if (!method.isAccessible()) {
@@ -138,9 +175,7 @@ class RetryInvocationHandler implements 
 
   @Override //RpcInvocationHandler
   public ConnectionId getConnectionId() {
-    RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
-        .getInvocationHandler(currentProxy);
-    return inv.getConnectionId();
+    return RPC.getConnectionIdForProxy(currentProxy);
   }
 
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Sun Mar 11 17:55:58 2012
@@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * <p>
  * A collection of useful implementations of {@link RetryPolicy}.
@@ -42,6 +44,8 @@ public class RetryPolicies {
   
   public static final Log LOG = LogFactory.getLog(RetryPolicies.class);
   
+  private static final Random RAND = new Random();
+  
   /**
    * <p>
    * Try once, and fail by re-throwing the exception.
@@ -52,14 +56,6 @@ public class RetryPolicies {
   
   /**
    * <p>
-   * Try once, and fail silently for <code>void</code> methods, or by
-   * re-throwing the exception for non-<code>void</code> methods.
-   * </p>
-   */
-  public static final RetryPolicy TRY_ONCE_DONT_FAIL = new TryOnceDontFail();
-  
-  /**
-   * <p>
    * Keep trying forever.
    * </p>
    */
@@ -137,16 +133,17 @@ public class RetryPolicies {
   
   public static final RetryPolicy failoverOnNetworkException(
       RetryPolicy fallbackPolicy, int maxFailovers) {
-    return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers);
+    return failoverOnNetworkException(fallbackPolicy, maxFailovers, 0, 0);
   }
   
-  static class TryOnceThenFail implements RetryPolicy {
-    public RetryAction shouldRetry(Exception e, int retries, int failovers,
-        boolean isMethodIdempotent) throws Exception {
-      throw e;
-    }
+  public static final RetryPolicy failoverOnNetworkException(
+      RetryPolicy fallbackPolicy, int maxFailovers, long delayMillis,
+      long maxDelayBase) {
+    return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
+        delayMillis, maxDelayBase);
   }
-  static class TryOnceDontFail implements RetryPolicy {
+  
+  static class TryOnceThenFail implements RetryPolicy {
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       return RetryAction.FAIL;
@@ -174,14 +171,10 @@ public class RetryPolicies {
     public RetryAction shouldRetry(Exception e, int retries, int failovers,
         boolean isMethodIdempotent) throws Exception {
       if (retries >= maxRetries) {
-        throw e;
-      }
-      try {
-        timeUnit.sleep(calculateSleepTime(retries));
-      } catch (InterruptedException ie) {
-        // retry
+        return RetryAction.FAIL;
       }
-      return RetryAction.RETRY;
+      return new RetryAction(RetryAction.RetryDecision.RETRY,
+          timeUnit.toMillis(calculateSleepTime(retries)));
     }
     
     protected abstract long calculateSleepTime(int retries);
@@ -268,7 +261,7 @@ public class RetryPolicies {
   }
   
   static class ExponentialBackoffRetry extends RetryLimited {
-    private Random r = new Random();
+    
     public ExponentialBackoffRetry(
         int maxRetries, long sleepTime, TimeUnit timeUnit) {
       super(maxRetries, sleepTime, timeUnit);
@@ -276,16 +269,19 @@ public class RetryPolicies {
     
     @Override
     protected long calculateSleepTime(int retries) {
-      return sleepTime*r.nextInt(1<<(retries+1));
+      return calculateExponentialTime(sleepTime, retries + 1);
     }
   }
   
-  /*
+  /**
    * Fail over and retry in the case of:
    *   Remote StandbyException (server is up, but is not the active server)
    *   Immediate socket exceptions (e.g. no route to host, econnrefused)
    *   Socket exceptions after initial connection when operation is idempotent
    * 
+   * The first failover is immediate, while all subsequent failovers wait an
+   * exponentially-increasing random amount of time.
+   * 
    * Fail immediately in the case of:
    *   Socket exceptions after initial connection when operation is not idempotent
    * 
@@ -295,33 +291,49 @@ public class RetryPolicies {
     
     private RetryPolicy fallbackPolicy;
     private int maxFailovers;
+    private long delayMillis;
+    private long maxDelayBase;
     
     public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
         int maxFailovers) {
+      this(fallbackPolicy, maxFailovers, 0, 0);
+    }
+    
+    public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
+        int maxFailovers, long delayMillis, long maxDelayBase) {
       this.fallbackPolicy = fallbackPolicy;
       this.maxFailovers = maxFailovers;
+      this.delayMillis = delayMillis;
+      this.maxDelayBase = maxDelayBase;
     }
 
     @Override
     public RetryAction shouldRetry(Exception e, int retries,
         int failovers, boolean isMethodIdempotent) throws Exception {
       if (failovers >= maxFailovers) {
-        LOG.info("Failovers (" + failovers + ") exceeded maximum allowed ("
+        return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
+            "failovers (" + failovers + ") exceeded maximum allowed ("
             + maxFailovers + ")");
-        return RetryAction.FAIL;
       }
       
       if (e instanceof ConnectException ||
           e instanceof NoRouteToHostException ||
           e instanceof UnknownHostException ||
-          e instanceof StandbyException) {
-        return RetryAction.FAILOVER_AND_RETRY;
+          e instanceof StandbyException ||
+          isWrappedStandbyException(e)) {
+        return new RetryAction(
+            RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+            // retry immediately if this is our first failover, sleep otherwise
+            failovers == 0 ? 0 :
+                calculateExponentialTime(delayMillis, failovers, maxDelayBase));
       } else if (e instanceof SocketException ||
-                 e instanceof IOException) {
+                 (e instanceof IOException && !(e instanceof RemoteException))) {
         if (isMethodIdempotent) {
           return RetryAction.FAILOVER_AND_RETRY;
         } else {
-          return RetryAction.FAIL;
+          return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
+              "the invoked method is not idempotent, and unable to determine " +
+              "whether it was invoked");
         }
       } else {
         return fallbackPolicy.shouldRetry(e, retries, failovers,
@@ -330,4 +342,34 @@ public class RetryPolicies {
     }
     
   }
+
+  /**
+   * Return a value which is <code>time</code> increasing exponentially as a
+   * function of <code>retries</code>, +/- 0%-50% of that value, chosen
+   * randomly.
+   * 
+   * @param time the base amount of time to work with
+   * @param retries the number of retries that have so occurred so far
+   * @param cap value at which to cap the base sleep time
+   * @return an amount of time to sleep
+   */
+  @VisibleForTesting
+  public static long calculateExponentialTime(long time, int retries,
+      long cap) {
+    long baseTime = Math.min(time * ((long)1 << retries), cap);
+    return (long) (baseTime * (RAND.nextFloat() + 0.5));
+  }
+
+  private static long calculateExponentialTime(long time, int retries) {
+    return calculateExponentialTime(time, retries, Long.MAX_VALUE);
+  }
+  
+  private static boolean isWrappedStandbyException(Exception e) {
+    if (!(e instanceof RemoteException)) {
+      return false;
+    }
+    Exception unwrapped = ((RemoteException)e).unwrapRemoteException(
+        StandbyException.class);
+    return unwrapped instanceof StandbyException;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java Sun Mar 11 17:55:58 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.io.retry;
 
 import org.apache.hadoop.classification.InterfaceStability;
 
-
 /**
  * <p>
  * Specifies a policy for retrying method failures.
@@ -33,10 +32,39 @@ public interface RetryPolicy {
    * Returned by {@link RetryPolicy#shouldRetry(Exception, int, int, boolean)}.
    */
   @InterfaceStability.Evolving
-  public enum RetryAction {
-    FAIL,
-    RETRY,
-    FAILOVER_AND_RETRY
+  public static class RetryAction {
+    
+    // A few common retry policies, with no delays.
+    public static final RetryAction FAIL =
+        new RetryAction(RetryDecision.FAIL);
+    public static final RetryAction RETRY =
+        new RetryAction(RetryDecision.RETRY);
+    public static final RetryAction FAILOVER_AND_RETRY =
+        new RetryAction(RetryDecision.FAILOVER_AND_RETRY);
+    
+    public final RetryDecision action;
+    public final long delayMillis;
+    public final String reason;
+    
+    public RetryAction(RetryDecision action) {
+      this(action, 0, null);
+    }
+    
+    public RetryAction(RetryDecision action, long delayTime) {
+      this(action, delayTime, null);
+    }
+    
+    public RetryAction(RetryDecision action, long delayTime, String reason) {
+      this.action = action;
+      this.delayMillis = delayTime;
+      this.reason = reason;
+    }
+    
+    public enum RetryDecision {
+      FAIL,
+      RETRY,
+      FAILOVER_AND_RETRY
+    }
   }
   
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Sun Mar 11 17:55:58 2012
@@ -227,6 +227,8 @@ public class Client {
     private int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     private int maxRetries; //the max. no. of retries for socket connections
+    // the max. no. of retries for socket connections on time out exceptions
+    private int maxRetriesOnSocketTimeouts;
     private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private boolean doPing; //do we need to send ping message
     private int pingInterval; // how often sends ping to the server in msecs
@@ -250,6 +252,7 @@ public class Client {
       this.rpcTimeout = remoteId.getRpcTimeout();
       this.maxIdleTime = remoteId.getMaxIdleTime();
       this.maxRetries = remoteId.getMaxRetries();
+      this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
       this.tcpNoDelay = remoteId.getTcpNoDelay();
       this.doPing = remoteId.getDoPing();
       this.pingInterval = remoteId.getPingInterval();
@@ -478,11 +481,8 @@ public class Client {
           if (updateAddress()) {
             timeoutFailures = ioFailures = 0;
           }
-          /*
-           * The max number of retries is 45, which amounts to 20s*45 = 15
-           * minutes retries.
-           */
-          handleConnectionFailure(timeoutFailures++, 45, toe);
+          handleConnectionFailure(timeoutFailures++,
+              maxRetriesOnSocketTimeouts, toe);
         } catch (IOException ie) {
           if (updateAddress()) {
             timeoutFailures = ioFailures = 0;
@@ -1286,6 +1286,8 @@ public class Client {
     private final int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
     private final int maxRetries; //the max. no. of retries for socket connections
+    // the max. no. of retries for socket connections on time out exceptions
+    private final int maxRetriesOnSocketTimeouts;
     private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
     private final boolean doPing; //do we need to send ping message
     private final int pingInterval; // how often sends ping to the server in msecs
@@ -1293,8 +1295,8 @@ public class Client {
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
                  UserGroupInformation ticket, int rpcTimeout,
                  String serverPrincipal, int maxIdleTime, 
-                 int maxRetries, boolean tcpNoDelay,
-                 boolean doPing, int pingInterval) {
+                 int maxRetries, int maxRetriesOnSocketTimeouts,
+                 boolean tcpNoDelay, boolean doPing, int pingInterval) {
       this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
@@ -1302,6 +1304,7 @@ public class Client {
       this.serverPrincipal = serverPrincipal;
       this.maxIdleTime = maxIdleTime;
       this.maxRetries = maxRetries;
+      this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
       this.tcpNoDelay = tcpNoDelay;
       this.doPing = doPing;
       this.pingInterval = pingInterval;
@@ -1335,6 +1338,11 @@ public class Client {
       return maxRetries;
     }
     
+    /** max connection retries on socket time outs */
+    public int getMaxRetriesOnSocketTimeouts() {
+      return maxRetriesOnSocketTimeouts;
+    }
+    
     boolean getTcpNoDelay() {
       return tcpNoDelay;
     }
@@ -1369,6 +1377,9 @@ public class Client {
               CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
           conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
               CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT),
+          conf.getInt(
+            CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+            CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
           conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
               CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT),
           doPing, 

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java Sun Mar 11 17:55:58 2012
@@ -40,6 +40,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
 import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
 import org.apache.hadoop.net.NetUtils;
@@ -530,9 +531,24 @@ public class RPC {
    * Returns the server address for a given proxy.
    */
   public static InetSocketAddress getServerAddress(Object proxy) {
+    return getConnectionIdForProxy(proxy).getAddress();
+  }
+
+  /**
+   * Return the connection ID of the given object. If the provided object is in
+   * fact a protocol translator, we'll get the connection ID of the underlying
+   * proxy object.
+   * 
+   * @param proxy the proxy object to get the connection ID of.
+   * @return the connection ID for the provided proxy object.
+   */
+  public static ConnectionId getConnectionIdForProxy(Object proxy) {
+    if (proxy instanceof ProtocolTranslator) {
+      proxy = ((ProtocolTranslator)proxy).getUnderlyingProxyObject();
+    }
     RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
         .getInvocationHandler(proxy);
-    return inv.getConnectionId().getAddress();
+    return inv.getConnectionId();
   }
    
   /**
@@ -564,6 +580,12 @@ public class RPC {
    * @param proxy the RPC proxy object to be stopped
    */
   public static void stopProxy(Object proxy) {
+    if (proxy instanceof ProtocolTranslator) {
+      RPC.stopProxy(((ProtocolTranslator)proxy)
+          .getUnderlyingProxyObject());
+      return;
+    }
+    
     InvocationHandler invocationHandler = null;
     try {
       invocationHandler = Proxy.getInvocationHandler(proxy);

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Sun Mar 11 17:55:58 2012
@@ -1671,6 +1671,10 @@ public abstract class Server {
               // on the server side, as opposed to just a normal exceptional
               // result.
               LOG.warn(logMsg, e);
+            } else if (e instanceof StandbyException) {
+              // Don't log the whole stack trace of these exceptions.
+              // Way too noisy!
+              LOG.info(logMsg);
             } else {
               LOG.info(logMsg, e);
             }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/StandbyException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/StandbyException.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/StandbyException.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/StandbyException.java Sun Mar 11 17:55:58 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
@@ -24,7 +26,7 @@ import org.apache.hadoop.classification.
  * set of servers in which only a subset may be active.
  */
 @InterfaceStability.Evolving
-public class StandbyException extends Exception {
+public class StandbyException extends IOException {
   static final long serialVersionUID = 0x12308AD010L;
   public StandbyException(String msg) {
     super(msg);

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java Sun Mar 11 17:55:58 2012
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.ServiceLoader;
@@ -449,6 +450,27 @@ public class SecurityUtil {
   }
   
   /**
+   * Perform the given action as the daemon's login user. If the login
+   * user cannot be determined, this will log a FATAL error and exit
+   * the whole JVM.
+   */
+  public static <T> T doAsLoginUserOrFatal(PrivilegedAction<T> action) { 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      UserGroupInformation ugi = null;
+      try { 
+        ugi = UserGroupInformation.getLoginUser();
+      } catch (IOException e) {
+        LOG.fatal("Exception while getting login user", e);
+        e.printStackTrace();
+        Runtime.getRuntime().exit(-1);
+      }
+      return ugi.doAs(action);
+    } else {
+      return action.run();
+    }
+  }
+
+  /**
    * Resolves a host subject to the security requirements determined by
    * hadoop.security.token.service.use_ip.
    * 
@@ -597,7 +619,7 @@ public class SecurityUtil {
     void setSearchDomains(String ... domains) {
       searchDomains = Arrays.asList(domains);
     }
-  }  
+  }
 
   public static void initKrb5CipherSuites() {
     if (UserGroupInformation.isSecurityEnabled()) {

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java Sun Mar 11 17:55:58 2012
@@ -40,6 +40,8 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.base.Preconditions;
+
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Evolving
 public abstract 
@@ -84,6 +86,12 @@ extends AbstractDelegationTokenIdentifie
   private Thread tokenRemoverThread;
   protected volatile boolean running;
 
+  /**
+   * If the delegation token update thread holds this lock, it will
+   * not get interrupted.
+   */
+  protected Object noInterruptsLock = new Object();
+
   public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
       long delegationTokenRemoverScanInterval) {
@@ -95,6 +103,7 @@ extends AbstractDelegationTokenIdentifie
 
   /** should be called before this object is used */
   public void startThreads() throws IOException {
+    Preconditions.checkState(!running);
     updateCurrentKey();
     synchronized (this) {
       running = true;
@@ -354,12 +363,21 @@ extends AbstractDelegationTokenIdentifie
     }
   }
 
-  public synchronized void stopThreads() {
+  public void stopThreads() {
     if (LOG.isDebugEnabled())
       LOG.debug("Stopping expired delegation token remover thread");
     running = false;
+    
     if (tokenRemoverThread != null) {
-      tokenRemoverThread.interrupt();
+      synchronized (noInterruptsLock) {
+        tokenRemoverThread.interrupt();
+      }
+      try {
+        tokenRemoverThread.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(
+            "Unable to join on token removal thread", e);
+      }
     }
   }
   
@@ -395,7 +413,7 @@ extends AbstractDelegationTokenIdentifie
             lastTokenCacheCleanup = now;
           }
           try {
-            Thread.sleep(5000); // 5 seconds
+            Thread.sleep(Math.min(5000, keyUpdateInterval)); // 5 seconds
           } catch (InterruptedException ie) {
             LOG
             .error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml Sun Mar 11 17:55:58 2012
@@ -208,6 +208,13 @@
     group list is separated by a blank. For e.g. "alice,bob users,wheel".
     A special value of "*" means all users are allowed.</description>
   </property>
+  
+  <property>
+    <name>security.ha.service.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for HAService protocol used by HAAdmin to manage the
+      active and stand-by states of namenode.</description>
+  </property>
 
    <property>
       <name>security.mrhs.client.protocol.acl</name>

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml Sun Mar 11 17:55:58 2012
@@ -488,6 +488,14 @@
 </property>
 
 <property>
+  <name>ipc.client.connect.max.retries.on.timeouts</name>
+  <value>45</value>
+  <description>Indicates the number of retries a client will make on socket timeout
+               to establish a server connection.
+  </description>
+</property>
+
+<property>
   <name>ipc.server.listen.queue.size</name>
   <value>128</value>
   <description>Indicates the length of the listen queue for servers accepting
@@ -849,4 +857,30 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.ha.fencing.methods</name>
+  <value></value>
+  <description>
+    List of fencing methods to use for service fencing. May contain
+    builtin methods (eg shell and sshfence) or user-defined method.
+  </description>
+</property>
+
+<property>
+  <name>dfs.ha.fencing.ssh.connect-timeout</name>
+  <value>30000</value>
+  <description>
+    SSH connection timeout, in milliseconds, to use with the builtin
+    sshfence fencer.
+  </description>
+</property>
+
+<property>
+  <name>dfs.ha.fencing.ssh.private-key-files</name>
+  <value></value>
+  <description>
+    The SSH private key files to use with the builtin sshfence fencer.
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java Sun Mar 11 17:55:58 2012
@@ -25,21 +25,23 @@ import java.util.concurrent.CountDownLat
 import org.apache.hadoop.io.retry.UnreliableImplementation.TypeOfExceptionToFailWith;
 import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
 import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Test;
 
+@SuppressWarnings("unchecked")
 public class TestFailoverProxy {
 
-  public static class FlipFlopProxyProvider implements FailoverProxyProvider {
+  public static class FlipFlopProxyProvider<T> implements FailoverProxyProvider<T> {
     
-    private Class<?> iface;
-    private Object currentlyActive;
-    private Object impl1;
-    private Object impl2;
+    private Class<T> iface;
+    private T currentlyActive;
+    private T impl1;
+    private T impl2;
     
     private int failoversOccurred = 0;
     
-    public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
-        Object standbyImpl) {
+    public FlipFlopProxyProvider(Class<T> iface, T activeImpl,
+        T standbyImpl) {
       this.iface = iface;
       this.impl1 = activeImpl;
       this.impl2 = standbyImpl;
@@ -47,7 +49,7 @@ public class TestFailoverProxy {
     }
     
     @Override
-    public Object getProxy() {
+    public T getProxy() {
       return currentlyActive;
     }
 
@@ -58,7 +60,7 @@ public class TestFailoverProxy {
     }
 
     @Override
-    public Class<?> getInterface() {
+    public Class<T> getInterface() {
       return iface;
     }
 
@@ -126,7 +128,7 @@ public class TestFailoverProxy {
         new FlipFlopProxyProvider(UnreliableInterface.class,
           new UnreliableImplementation("impl1"),
           new UnreliableImplementation("impl2")),
-        RetryPolicies.TRY_ONCE_DONT_FAIL);
+        RetryPolicies.TRY_ONCE_THEN_FAIL);
 
     unreliable.succeedsOnceThenFailsReturningString();
     try {
@@ -180,7 +182,7 @@ public class TestFailoverProxy {
     
     assertEquals("impl1", unreliable.succeedsOnceThenFailsReturningString());
     try {
-      assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningString());
+      unreliable.succeedsOnceThenFailsReturningString();
       fail("should not have succeeded twice");
     } catch (IOException e) {
       // Make sure we *don't* fail over since the first implementation threw an
@@ -194,6 +196,27 @@ public class TestFailoverProxy {
     assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
   }
   
+  /**
+   * Test that if a non-idempotent void function is called, and there is an exception,
+   * the exception is properly propagated
+   */
+  @Test
+  public void testExceptionPropagatedForNonIdempotentVoid() throws Exception {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.IO_EXCEPTION),
+          new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+        RetryPolicies.failoverOnNetworkException(1));
+
+    try {
+      unreliable.nonIdempotentVoidFailsIfIdentifierDoesntMatch("impl2");
+      fail("did not throw an exception");
+    } catch (Exception e) {
+    }
+
+  }
+  
   private static class SynchronizedUnreliableImplementation extends UnreliableImplementation {
     
     private CountDownLatch methodLatch;
@@ -267,4 +290,62 @@ public class TestFailoverProxy {
     assertEquals("impl2", t2.result);
     assertEquals(1, proxyProvider.getFailoversOccurred());
   }
+
+  /**
+   * Ensure that when all configured services are throwing StandbyException
+   * that we fail over back and forth between them until one is no longer
+   * throwing StandbyException.
+   */
+  @Test
+  public void testFailoverBetweenMultipleStandbys()
+      throws UnreliableException, StandbyException, IOException {
+    
+    final long millisToSleep = 10000;
+    
+    final UnreliableImplementation impl1 = new UnreliableImplementation("impl1",
+        TypeOfExceptionToFailWith.STANDBY_EXCEPTION);
+    FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
+        UnreliableInterface.class,
+        impl1,
+        new UnreliableImplementation("impl2",
+            TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
+    
+    final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+      .create(UnreliableInterface.class, proxyProvider,
+          RetryPolicies.failoverOnNetworkException(
+              RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
+    
+    new Thread() {
+      @Override
+      public void run() {
+        ThreadUtil.sleepAtLeastIgnoreInterrupts(millisToSleep);
+        impl1.setIdentifier("renamed-impl1");
+      }
+    }.start();
+    
+    String result = unreliable.failsIfIdentifierDoesntMatch("renamed-impl1");
+    assertEquals("renamed-impl1", result);
+  }
+  
+  /**
+   * Ensure that normal IO exceptions don't result in a failover.
+   */
+  @Test
+  public void testExpectedIOException() {
+    UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
+    .create(UnreliableInterface.class,
+        new FlipFlopProxyProvider(UnreliableInterface.class,
+          new UnreliableImplementation("impl1", TypeOfExceptionToFailWith.REMOTE_EXCEPTION),
+          new UnreliableImplementation("impl2", TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION)),
+          RetryPolicies.failoverOnNetworkException(
+              RetryPolicies.TRY_ONCE_THEN_FAIL, 10, 1000, 10000));
+    
+    try {
+      unreliable.failsIfIdentifierDoesntMatch("no-such-identifier");
+      fail("Should have thrown *some* exception");
+    } catch (Exception e) {
+      assertTrue("Expected IOE but got " + e.getClass(),
+          e instanceof IOException);
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestRetryProxy.java Sun Mar 11 17:55:58 2012
@@ -19,7 +19,6 @@
 package org.apache.hadoop.io.retry;
 
 import static org.apache.hadoop.io.retry.RetryPolicies.RETRY_FOREVER;
-import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_DONT_FAIL;
 import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
@@ -59,19 +58,6 @@ public class TestRetryProxy extends Test
     }
   }
   
-  public void testTryOnceDontFail() throws UnreliableException {
-    UnreliableInterface unreliable = (UnreliableInterface)
-      RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_DONT_FAIL);
-    unreliable.alwaysSucceeds();
-    unreliable.failsOnceThenSucceeds();
-    try {
-      unreliable.failsOnceThenSucceedsWithReturnValue();
-      fail("Should fail");
-    } catch (UnreliableException e) {
-      // expected
-    }
-  }
-  
   public void testRetryForever() throws UnreliableException {
     UnreliableInterface unreliable = (UnreliableInterface)
       RetryProxy.create(UnreliableInterface.class, unreliableImpl, RETRY_FOREVER);

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableImplementation.java Sun Mar 11 17:55:58 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.io.retry;
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 
@@ -37,7 +38,8 @@ public class UnreliableImplementation im
   public static enum TypeOfExceptionToFailWith {
     UNRELIABLE_EXCEPTION,
     STANDBY_EXCEPTION,
-    IO_EXCEPTION
+    IO_EXCEPTION,
+    REMOTE_EXCEPTION
   }
   
   public UnreliableImplementation() {
@@ -48,6 +50,10 @@ public class UnreliableImplementation im
     this(identifier, TypeOfExceptionToFailWith.UNRELIABLE_EXCEPTION);
   }
   
+  public void setIdentifier(String identifier) {
+    this.identifier = identifier;
+  }
+  
   public UnreliableImplementation(String identifier,
       TypeOfExceptionToFailWith exceptionToFailWith) {
     this.identifier = identifier;
@@ -91,14 +97,7 @@ public class UnreliableImplementation im
     if (succeedsOnceThenFailsCount++ < 1) {
       return identifier;
     } else {
-      switch (exceptionToFailWith) {
-      case STANDBY_EXCEPTION:
-        throw new StandbyException(identifier);
-      case UNRELIABLE_EXCEPTION:
-        throw new UnreliableException(identifier);
-      case IO_EXCEPTION:
-        throw new IOException(identifier);
-      }
+      throwAppropriateException(exceptionToFailWith, identifier);
       return null;
     }
   }
@@ -109,16 +108,8 @@ public class UnreliableImplementation im
     if (succeedsTenTimesThenFailsCount++ < 10) {
       return identifier;
     } else {
-      switch (exceptionToFailWith) {
-      case STANDBY_EXCEPTION:
-        throw new StandbyException(identifier);
-      case UNRELIABLE_EXCEPTION:
-        throw new UnreliableException(identifier);
-      case IO_EXCEPTION:
-        throw new IOException(identifier);
-      default:
-        throw new RuntimeException(identifier);
-      }
+      throwAppropriateException(exceptionToFailWith, identifier);
+      return null;
     }
   }
 
@@ -128,16 +119,8 @@ public class UnreliableImplementation im
     if (succeedsOnceThenFailsIdempotentCount++ < 1) {
       return identifier;
     } else {
-      switch (exceptionToFailWith) {
-      case STANDBY_EXCEPTION:
-        throw new StandbyException(identifier);
-      case UNRELIABLE_EXCEPTION:
-        throw new UnreliableException(identifier);
-      case IO_EXCEPTION:
-        throw new IOException(identifier);
-      default:
-        throw new RuntimeException(identifier);
-      }
+      throwAppropriateException(exceptionToFailWith, identifier);
+      return null;
     }
   }
 
@@ -147,17 +130,38 @@ public class UnreliableImplementation im
     if (this.identifier.equals(identifier)) {
       return identifier;
     } else {
-      switch (exceptionToFailWith) {
-      case STANDBY_EXCEPTION:
-        throw new StandbyException(identifier);
-      case UNRELIABLE_EXCEPTION:
-        throw new UnreliableException(identifier);
-      case IO_EXCEPTION:
-        throw new IOException(identifier);
-      default:
-        throw new RuntimeException(identifier);
-      }
+      String message = "expected '" + this.identifier + "' but received '" +
+          identifier + "'";
+      throwAppropriateException(exceptionToFailWith, message);
+      return null;
+    }
+  }
+  
+  @Override
+  public void nonIdempotentVoidFailsIfIdentifierDoesntMatch(String identifier)
+      throws UnreliableException, StandbyException, IOException {
+    if (this.identifier.equals(identifier)) {
+      return;
+    } else {
+      String message = "expected '" + this.identifier + "' but received '" +
+          identifier + "'";
+      throwAppropriateException(exceptionToFailWith, message);
     }
   }
 
+  private static void throwAppropriateException(TypeOfExceptionToFailWith eType,
+      String message) throws UnreliableException, StandbyException, IOException {
+    switch (eType) {
+    case STANDBY_EXCEPTION:
+      throw new StandbyException(message);
+    case UNRELIABLE_EXCEPTION:
+      throw new UnreliableException(message);
+    case IO_EXCEPTION:
+      throw new IOException(message);
+    case REMOTE_EXCEPTION:
+      throw new RemoteException(IOException.class.getName(), message);
+    default:
+      throw new RuntimeException(message);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/UnreliableInterface.java Sun Mar 11 17:55:58 2012
@@ -67,4 +67,7 @@ public interface UnreliableInterface {
   @Idempotent
   public String failsIfIdentifierDoesntMatch(String identifier)
       throws UnreliableException, StandbyException, IOException;
+
+  void nonIdempotentVoidFailsIfIdentifierDoesntMatch(String identifier)
+      throws UnreliableException, StandbyException, IOException;
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1299412&r1=1299411&r2=1299412&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Sun Mar 11 17:55:58 2012
@@ -20,7 +20,9 @@ package org.apache.hadoop.ipc;
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@@ -590,6 +592,38 @@ public class TestIPC {
         Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
   }
   
+  @Test
+  public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
+    Configuration conf = new Configuration();
+    // set max retries to 0
+    conf.setInt(
+      CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+      0);
+    assertRetriesOnSocketTimeouts(conf, 1);
+
+    // set max retries to 3
+    conf.setInt(
+      CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+      3);
+    assertRetriesOnSocketTimeouts(conf, 4);
+  }
+
+  private void assertRetriesOnSocketTimeouts(Configuration conf,
+      int maxTimeoutRetries) throws IOException, InterruptedException {
+    SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+    doThrow(new SocketTimeoutException()).when(mockFactory).createSocket();
+    Client client = new Client(IntWritable.class, conf, mockFactory);
+    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
+    try {
+      client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0,
+          conf);
+      fail("Not throwing the SocketTimeoutException");
+    } catch (SocketTimeoutException e) {
+      Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
+          .createSocket();
+    }
+  }
+  
   private void doIpcVersionTest(
       byte[] requestData,
       byte[] expectedResponse) throws Exception {