You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2010/08/03 02:51:57 UTC

svn commit: r981714 - in /hadoop/common/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/java/org/apache/hadoop/security/UserGroupInformation.java

Author: ddas
Date: Tue Aug  3 00:51:57 2010
New Revision: 981714

URL: http://svn.apache.org/viewvc?rev=981714&view=rev
Log:
HADOOP-6706. Improves the sasl failure handling due to expired tickets, and other server detected failures. Contributed by Jitendra Pandey and Devaraj Das.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Aug  3 00:51:57 2010
@@ -173,6 +173,9 @@ Trunk (unreleased changes)
     HADOOP-6873. using delegation token over hftp for long 
     running clients (boryas)
 
+    HADOOP-6706. Improves the sasl failure handling due to expired tickets,
+    and other server detected failures. (Jitendra Pandey and ddas via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Aug  3 00:51:57 2010
@@ -36,6 +36,7 @@ import java.io.OutputStream;
 import java.security.PrivilegedExceptionAction;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -367,53 +368,109 @@ public class Client {
       if (saslRpcClient != null) {
         try {
           saslRpcClient.dispose();
+          saslRpcClient = null;
         } catch (IOException ignored) {
         }
       }
     }
     
+    private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+      UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      UserGroupInformation realUser = currentUser.getRealUser();
+      if (authMethod == AuthMethod.KERBEROS && loginUser != null &&
+      // Make sure user logged in using Kerberos either keytab or TGT
+          loginUser.hasKerberosCredentials() &&
+          // relogin only in case it is the login user (e.g. JT)
+          // or superuser (like oozie).
+          (loginUser.equals(currentUser) || loginUser.equals(realUser))) {
+        return true;
+      }
+      return false;
+    }
+    
     private synchronized boolean setupSaslConnection(final InputStream in2, 
         final OutputStream out2) 
         throws IOException {
-      try {
-        saslRpcClient = new SaslRpcClient(authMethod, token,
-            serverPrincipal);
-        return saslRpcClient.saslConnect(in2, out2);
-      } catch (javax.security.sasl.SaslException je) {
-        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-        UserGroupInformation currentUser = 
-          UserGroupInformation.getCurrentUser();
-        UserGroupInformation realUser = currentUser.getRealUser();
-        if (authMethod == AuthMethod.KERBEROS && 
-          //try setting up the connection again
-          // relogin only in case it is the login user (e.g. JT)
-          // or superuser (like oozie).
-          ((currentUser != null && currentUser.equals(loginUser)) ||
-           (realUser != null && realUser.equals(loginUser)))) {
-          try {
-            //try re-login
-            if (UserGroupInformation.isLoginKeytabBased()) {
-              loginUser.reloginFromKeytab();
+      saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
+      return saslRpcClient.saslConnect(in2, out2);
+    }
+    
+    private synchronized void setupConnection() throws IOException {
+      short ioFailures = 0;
+      short timeoutFailures = 0;
+      while (true) {
+        try {
+          this.socket = socketFactory.createSocket();
+          this.socket.setTcpNoDelay(tcpNoDelay);
+          // connection time out is 20s
+          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+          this.socket.setSoTimeout(pingInterval);
+          return;
+        } catch (SocketTimeoutException toe) {
+          /*
+           * The max number of retries is 45, which amounts to 20s*45 = 15
+           * minutes retries.
+           */
+          handleConnectionFailure(timeoutFailures++, 45, toe);
+        } catch (IOException ie) {
+          handleConnectionFailure(ioFailures++, maxRetries, ie);
+        }
+      }
+    }
+
+    /**
+     * If multiple clients with the same principal try to connect to the same
+     * server at the same time, the server assumes a replay attack is in
+     * progress. This is a feature of kerberos. In order to work around this,
+     * what is done is that the client backs off randomly and tries to initiate
+     * the connection again. The other problem is to do with ticket expiry. To
+     * handle that, a relogin is attempted.
+     */
+    private synchronized void handleSaslConnectionFailure(
+        final int currRetries, final int maxRetries, final Exception ex,
+        final Random rand, final UserGroupInformation ugi) throws IOException,
+        InterruptedException {
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        public Object run() throws IOException, InterruptedException {
+          final short MAX_BACKOFF = 5000;
+          closeConnection();
+          disposeSasl();
+          if (shouldAuthenticateOverKrb()) {
+            if (currRetries < maxRetries) {
+              LOG.debug("Exception encountered while connecting to "
+                  + "the server : " + ex);
+              // try re-login
+              if (UserGroupInformation.isLoginKeytabBased()) {
+                UserGroupInformation.getLoginUser().reloginFromKeytab();
+              } else {
+                UserGroupInformation.getLoginUser().reloginFromTicketCache();
+              }
+              // have granularity of milliseconds
+              //we are sleeping with the Connection lock held but since this
+              //connection instance is being used for connecting to the server
+              //in question, it is okay
+              Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1));
+              return null;
             } else {
-              loginUser.reloginFromTicketCache();
+              String msg = "Couldn't setup connection for "
+                  + UserGroupInformation.getLoginUser().getUserName() + " to "
+                  + serverPrincipal;
+              LOG.warn(msg);
+              throw (IOException) new IOException(msg).initCause(ex);
             }
-            disposeSasl();
-            saslRpcClient = new SaslRpcClient(authMethod, token,
-                serverPrincipal);
-            return saslRpcClient.saslConnect(in2, out2);
-          } catch (javax.security.sasl.SaslException jee) {
-            LOG.warn("Couldn't setup connection for " + 
-                loginUser.getUserName() +
-                " to " + serverPrincipal + " even after relogin.");
-            throw jee;
-          } catch (IOException ie) {
-            ie.initCause(je);
-            throw ie;
+          } else {
+            LOG.warn("Exception encountered while connecting to "
+                + "the server : " + ex);
           }
-        } 
-        throw je;
-      }
+          if (ex instanceof RemoteException)
+            throw (RemoteException) ex;
+          throw new IOException(ex);
+        }
+      });
     }
+
+    
     /** Connect to the server and set up the I/O streams. It then sends
      * a header to the server and starts
      * the connection thread that waits for responses.
@@ -421,81 +478,95 @@ public class Client {
     private synchronized void setupIOstreams() throws InterruptedException {
       if (socket != null || shouldCloseConnection.get()) {
         return;
-      }
-      
-      short ioFailures = 0;
-      short timeoutFailures = 0;
+      } 
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Connecting to "+server);
         }
+        short numRetries = 0;
+        final short MAX_RETRIES = 5;
+        Random rand = null;
         while (true) {
-          try {
-            this.socket = socketFactory.createSocket();
-            this.socket.setTcpNoDelay(tcpNoDelay);
-            // connection time out is 20s
-            NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
-            this.socket.setSoTimeout(pingInterval);
-            break;
-          } catch (SocketTimeoutException toe) {
-            /* The max number of retries is 45,
-             * which amounts to 20s*45 = 15 minutes retries.
-             */
-            handleConnectionFailure(timeoutFailures++, 45, toe);
-          } catch (IOException ie) {
-            handleConnectionFailure(ioFailures++, maxRetries, ie);
-          }
-        }
-        InputStream inStream = NetUtils.getInputStream(socket);
-        OutputStream outStream = NetUtils.getOutputStream(socket);
-        writeRpcHeader(outStream);
-        if (useSasl) {
-          final InputStream in2 = inStream;
-          final OutputStream out2 = outStream;
-          UserGroupInformation ticket = remoteId.getTicket();
-          if (authMethod == AuthMethod.KERBEROS) {
-            if (ticket.getRealUser() != null) {
-              ticket = ticket.getRealUser();
+          setupConnection();
+          InputStream inStream = NetUtils.getInputStream(socket);
+          OutputStream outStream = NetUtils.getOutputStream(socket);
+          writeRpcHeader(outStream);
+          if (useSasl) {
+            final InputStream in2 = inStream;
+            final OutputStream out2 = outStream;
+            UserGroupInformation ticket = remoteId.getTicket();
+            if (authMethod == AuthMethod.KERBEROS) {
+              if (ticket.getRealUser() != null) {
+                ticket = ticket.getRealUser();
+              }
             }
-          }
-          if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
-            @Override
-            public Boolean run() throws IOException {
-              return setupSaslConnection(in2, out2);
+            boolean continueSasl = false;
+            try {
+              continueSasl = ticket
+                  .doAs(new PrivilegedExceptionAction<Boolean>() {
+                    @Override
+                    public Boolean run() throws IOException {
+                      return setupSaslConnection(in2, out2);
+                    }
+                  });
+            } catch (Exception ex) {
+              if (rand == null) {
+                rand = new Random();
+              }
+              handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+                  ticket);
+              continue;
+            }
+            if (continueSasl) {
+              // Sasl connect is successful. Let's set up Sasl i/o streams.
+              inStream = saslRpcClient.getInputStream(inStream);
+              outStream = saslRpcClient.getOutputStream(outStream);
+            } else {
+              // fall back to simple auth because server told us so.
+              authMethod = AuthMethod.SIMPLE;
+              header = new ConnectionHeader(header.getProtocol(), header
+                  .getUgi(), authMethod);
+              useSasl = false;
             }
-          })) {
-            // Sasl connect is successful. Let's set up Sasl i/o streams.
-            inStream = saslRpcClient.getInputStream(inStream);
-            outStream = saslRpcClient.getOutputStream(outStream);
+          }
+        
+          if (doPing) {
+            this.in = new DataInputStream(new BufferedInputStream(
+                new PingInputStream(inStream)));
           } else {
-            // fall back to simple auth because server told us so.
-            authMethod = AuthMethod.SIMPLE;
-            header = new ConnectionHeader(header.getProtocol(),
-                header.getUgi(), authMethod);
-            useSasl = false;
+            this.in = new DataInputStream(new BufferedInputStream(inStream));
           }
-        }
-        if (doPing) {
-          this.in = new DataInputStream(new BufferedInputStream
-            (new PingInputStream(inStream)));
-        } else {
-          this.in = new DataInputStream(new BufferedInputStream
-            (inStream));
-        }
-        this.out = new DataOutputStream
-            (new BufferedOutputStream(outStream));
-        writeHeader();
+          this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+          writeHeader();
 
-        // update last activity time
-        touch();
+          // update last activity time
+          touch();
 
-        // start the receiver thread after the socket connection has been set up
-        start();
+          // start the receiver thread after the socket connection has been set
+          // up
+          start();
+          return;
+        }
       } catch (IOException e) {
         markClosed(e);
         close();
       }
     }
+    
+    private void closeConnection() {
+      if (socket == null) {
+        return;
+      }
+      // close the current connection
+      try {
+        socket.close();
+      } catch (IOException e) {
+        LOG.warn("Not able to close a socket", e);
+      }
+      // set socket to null so that the next call to setupIOstreams
+      // can start the process of connect all over again.
+      socket = null;
+    }
 
     /* Handle connection failures
      *
@@ -513,17 +584,8 @@ public class Client {
      */
     private void handleConnectionFailure(
         int curRetries, int maxRetries, IOException ioe) throws IOException {
-      // close the current connection
-      if (socket != null) {
-        try {
-          socket.close();
-        } catch (IOException e) {
-          LOG.warn("Not able to close a socket", e);
-        }
-      }
-      // set socket to null so that the next call to setupIOstreams
-      // can start the process of connect all over again.
-      socket = null;
+
+      closeConnection();
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries >= maxRetries) {

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Tue Aug  3 00:51:57 2010
@@ -232,6 +232,7 @@ public class UserGroupInformation {
   // All non-static fields must be read-only caches that come from the subject.
   private final User user;
   private final boolean isKeytab;
+  private final boolean isKrbTkt;
   
   private static final String OS_LOGIN_MODULE_NAME;
   private static final Class<? extends Principal> OS_PRINCIPAL_CLASS;
@@ -373,6 +374,15 @@ public class UserGroupInformation {
     this.subject = subject;
     this.user = subject.getPrincipals(User.class).iterator().next();
     this.isKeytab = !subject.getPrivateCredentials(KerberosKey.class).isEmpty();
+    this.isKrbTkt = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+  }
+  
+  /**
+   * checks if logged in using kerberos
+   * @return true if the subject logged via keytab or has a Kerberos TGT
+   */
+  public boolean hasKerberosCredentials() {
+    return isKeytab || isKrbTkt;
   }
 
   /**
@@ -602,7 +612,7 @@ public class UserGroupInformation {
   throws IOException {
     if (!isSecurityEnabled() || 
         user.getAuthenticationMethod() != AuthenticationMethod.KERBEROS ||
-        isKeytab)
+        !isKrbTkt)
       return;
     LoginContext login = getLogin();
     if (login == null) {
@@ -726,10 +736,9 @@ public class UserGroupInformation {
   /**
    * Create a proxy user using username of the effective user and the ugi of the
    * real user.
-   *
-   * @param effective
-   *          user, UGI for real user.
-   * @return
+   * @param user
+   * @param realUser
+   * @return proxyUser ugi
    */
   public static UserGroupInformation createProxyUser(String user,
       UserGroupInformation realUser) {