You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2010/08/03 02:51:57 UTC
svn commit: r981714 - in /hadoop/common/trunk: CHANGES.txt
src/java/org/apache/hadoop/ipc/Client.java
src/java/org/apache/hadoop/security/UserGroupInformation.java
Author: ddas
Date: Tue Aug 3 00:51:57 2010
New Revision: 981714
URL: http://svn.apache.org/viewvc?rev=981714&view=rev
Log:
HADOOP-6706. Improves the sasl failure handling due to expired tickets, and other server detected failures. Contributed by Jitendra Pandey and Devaraj Das.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Aug 3 00:51:57 2010
@@ -173,6 +173,9 @@ Trunk (unreleased changes)
HADOOP-6873. using delegation token over hftp for long
running clients (boryas)
+ HADOOP-6706. Improves the sasl failure handling due to expired tickets,
+ and other server detected failures. (Jitendra Pandey and ddas via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Client.java Tue Aug 3 00:51:57 2010
@@ -36,6 +36,7 @@ import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Random;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -367,53 +368,109 @@ public class Client {
if (saslRpcClient != null) {
try {
saslRpcClient.dispose();
+ saslRpcClient = null;
} catch (IOException ignored) {
}
}
}
+ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+ UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation realUser = currentUser.getRealUser();
+ if (authMethod == AuthMethod.KERBEROS && loginUser != null &&
+ // Make sure user logged in using Kerberos either keytab or TGT
+ loginUser.hasKerberosCredentials() &&
+ // relogin only in case it is the login user (e.g. JT)
+ // or superuser (like oozie).
+ (loginUser.equals(currentUser) || loginUser.equals(realUser))) {
+ return true;
+ }
+ return false;
+ }
+
private synchronized boolean setupSaslConnection(final InputStream in2,
final OutputStream out2)
throws IOException {
- try {
- saslRpcClient = new SaslRpcClient(authMethod, token,
- serverPrincipal);
- return saslRpcClient.saslConnect(in2, out2);
- } catch (javax.security.sasl.SaslException je) {
- UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
- UserGroupInformation currentUser =
- UserGroupInformation.getCurrentUser();
- UserGroupInformation realUser = currentUser.getRealUser();
- if (authMethod == AuthMethod.KERBEROS &&
- //try setting up the connection again
- // relogin only in case it is the login user (e.g. JT)
- // or superuser (like oozie).
- ((currentUser != null && currentUser.equals(loginUser)) ||
- (realUser != null && realUser.equals(loginUser)))) {
- try {
- //try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- loginUser.reloginFromKeytab();
+ saslRpcClient = new SaslRpcClient(authMethod, token, serverPrincipal);
+ return saslRpcClient.saslConnect(in2, out2);
+ }
+
+ private synchronized void setupConnection() throws IOException {
+ short ioFailures = 0;
+ short timeoutFailures = 0;
+ while (true) {
+ try {
+ this.socket = socketFactory.createSocket();
+ this.socket.setTcpNoDelay(tcpNoDelay);
+ // connection time out is 20s
+ NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
+ this.socket.setSoTimeout(pingInterval);
+ return;
+ } catch (SocketTimeoutException toe) {
+ /*
+ * The max number of retries is 45, which amounts to 20s*45 = 15
+ * minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, 45, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
+ }
+ }
+ }
+
+ /**
+ * If multiple clients with the same principal try to connect to the same
+ * server at the same time, the server assumes a replay attack is in
+ * progress. This is a feature of kerberos. In order to work around this,
+ * what is done is that the client backs off randomly and tries to initiate
+ * the connection again. The other problem is to do with ticket expiry. To
+ * handle that, a relogin is attempted.
+ */
+ private synchronized void handleSaslConnectionFailure(
+ final int currRetries, final int maxRetries, final Exception ex,
+ final Random rand, final UserGroupInformation ugi) throws IOException,
+ InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ public Object run() throws IOException, InterruptedException {
+ final short MAX_BACKOFF = 5000;
+ closeConnection();
+ disposeSasl();
+ if (shouldAuthenticateOverKrb()) {
+ if (currRetries < maxRetries) {
+ LOG.debug("Exception encountered while connecting to "
+ + "the server : " + ex);
+ // try re-login
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ } else {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ // have granularity of milliseconds
+ //we are sleeping with the Connection lock held but since this
+ //connection instance is being used for connecting to the server
+ //in question, it is okay
+ Thread.sleep((rand.nextInt(MAX_BACKOFF) + 1));
+ return null;
} else {
- loginUser.reloginFromTicketCache();
+ String msg = "Couldn't setup connection for "
+ + UserGroupInformation.getLoginUser().getUserName() + " to "
+ + serverPrincipal;
+ LOG.warn(msg);
+ throw (IOException) new IOException(msg).initCause(ex);
}
- disposeSasl();
- saslRpcClient = new SaslRpcClient(authMethod, token,
- serverPrincipal);
- return saslRpcClient.saslConnect(in2, out2);
- } catch (javax.security.sasl.SaslException jee) {
- LOG.warn("Couldn't setup connection for " +
- loginUser.getUserName() +
- " to " + serverPrincipal + " even after relogin.");
- throw jee;
- } catch (IOException ie) {
- ie.initCause(je);
- throw ie;
+ } else {
+ LOG.warn("Exception encountered while connecting to "
+ + "the server : " + ex);
}
- }
- throw je;
- }
+ if (ex instanceof RemoteException)
+ throw (RemoteException) ex;
+ throw new IOException(ex);
+ }
+ });
}
+
+
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
@@ -421,81 +478,95 @@ public class Client {
private synchronized void setupIOstreams() throws InterruptedException {
if (socket != null || shouldCloseConnection.get()) {
return;
- }
-
- short ioFailures = 0;
- short timeoutFailures = 0;
+ }
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
+ short numRetries = 0;
+ final short MAX_RETRIES = 5;
+ Random rand = null;
while (true) {
- try {
- this.socket = socketFactory.createSocket();
- this.socket.setTcpNoDelay(tcpNoDelay);
- // connection time out is 20s
- NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
- this.socket.setSoTimeout(pingInterval);
- break;
- } catch (SocketTimeoutException toe) {
- /* The max number of retries is 45,
- * which amounts to 20s*45 = 15 minutes retries.
- */
- handleConnectionFailure(timeoutFailures++, 45, toe);
- } catch (IOException ie) {
- handleConnectionFailure(ioFailures++, maxRetries, ie);
- }
- }
- InputStream inStream = NetUtils.getInputStream(socket);
- OutputStream outStream = NetUtils.getOutputStream(socket);
- writeRpcHeader(outStream);
- if (useSasl) {
- final InputStream in2 = inStream;
- final OutputStream out2 = outStream;
- UserGroupInformation ticket = remoteId.getTicket();
- if (authMethod == AuthMethod.KERBEROS) {
- if (ticket.getRealUser() != null) {
- ticket = ticket.getRealUser();
+ setupConnection();
+ InputStream inStream = NetUtils.getInputStream(socket);
+ OutputStream outStream = NetUtils.getOutputStream(socket);
+ writeRpcHeader(outStream);
+ if (useSasl) {
+ final InputStream in2 = inStream;
+ final OutputStream out2 = outStream;
+ UserGroupInformation ticket = remoteId.getTicket();
+ if (authMethod == AuthMethod.KERBEROS) {
+ if (ticket.getRealUser() != null) {
+ ticket = ticket.getRealUser();
+ }
}
- }
- if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws IOException {
- return setupSaslConnection(in2, out2);
+ boolean continueSasl = false;
+ try {
+ continueSasl = ticket
+ .doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws IOException {
+ return setupSaslConnection(in2, out2);
+ }
+ });
+ } catch (Exception ex) {
+ if (rand == null) {
+ rand = new Random();
+ }
+ handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand,
+ ticket);
+ continue;
+ }
+ if (continueSasl) {
+ // Sasl connect is successful. Let's set up Sasl i/o streams.
+ inStream = saslRpcClient.getInputStream(inStream);
+ outStream = saslRpcClient.getOutputStream(outStream);
+ } else {
+ // fall back to simple auth because server told us so.
+ authMethod = AuthMethod.SIMPLE;
+ header = new ConnectionHeader(header.getProtocol(), header
+ .getUgi(), authMethod);
+ useSasl = false;
}
- })) {
- // Sasl connect is successful. Let's set up Sasl i/o streams.
- inStream = saslRpcClient.getInputStream(inStream);
- outStream = saslRpcClient.getOutputStream(outStream);
+ }
+
+ if (doPing) {
+ this.in = new DataInputStream(new BufferedInputStream(
+ new PingInputStream(inStream)));
} else {
- // fall back to simple auth because server told us so.
- authMethod = AuthMethod.SIMPLE;
- header = new ConnectionHeader(header.getProtocol(),
- header.getUgi(), authMethod);
- useSasl = false;
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
}
- }
- if (doPing) {
- this.in = new DataInputStream(new BufferedInputStream
- (new PingInputStream(inStream)));
- } else {
- this.in = new DataInputStream(new BufferedInputStream
- (inStream));
- }
- this.out = new DataOutputStream
- (new BufferedOutputStream(outStream));
- writeHeader();
+ this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+ writeHeader();
- // update last activity time
- touch();
+ // update last activity time
+ touch();
- // start the receiver thread after the socket connection has been set up
- start();
+ // start the receiver thread after the socket connection has been set
+ // up
+ start();
+ return;
+ }
} catch (IOException e) {
markClosed(e);
close();
}
}
+
+ private void closeConnection() {
+ if (socket == null) {
+ return;
+ }
+ // close the current connection
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.warn("Not able to close a socket", e);
+ }
+ // set socket to null so that the next call to setupIOstreams
+ // can start the process of connect all over again.
+ socket = null;
+ }
/* Handle connection failures
*
@@ -513,17 +584,8 @@ public class Client {
*/
private void handleConnectionFailure(
int curRetries, int maxRetries, IOException ioe) throws IOException {
- // close the current connection
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- LOG.warn("Not able to close a socket", e);
- }
- }
- // set socket to null so that the next call to setupIOstreams
- // can start the process of connect all over again.
- socket = null;
+
+ closeConnection();
// throw the exception if the maximum number of retries is reached
if (curRetries >= maxRetries) {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java?rev=981714&r1=981713&r2=981714&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/security/UserGroupInformation.java Tue Aug 3 00:51:57 2010
@@ -232,6 +232,7 @@ public class UserGroupInformation {
// All non-static fields must be read-only caches that come from the subject.
private final User user;
private final boolean isKeytab;
+ private final boolean isKrbTkt;
private static final String OS_LOGIN_MODULE_NAME;
private static final Class<? extends Principal> OS_PRINCIPAL_CLASS;
@@ -373,6 +374,15 @@ public class UserGroupInformation {
this.subject = subject;
this.user = subject.getPrincipals(User.class).iterator().next();
this.isKeytab = !subject.getPrivateCredentials(KerberosKey.class).isEmpty();
+ this.isKrbTkt = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+ }
+
+ /**
+ * checks if logged in using kerberos
+ * @return true if the subject logged via keytab or has a Kerberos TGT
+ */
+ public boolean hasKerberosCredentials() {
+ return isKeytab || isKrbTkt;
}
/**
@@ -602,7 +612,7 @@ public class UserGroupInformation {
throws IOException {
if (!isSecurityEnabled() ||
user.getAuthenticationMethod() != AuthenticationMethod.KERBEROS ||
- isKeytab)
+ !isKrbTkt)
return;
LoginContext login = getLogin();
if (login == null) {
@@ -726,10 +736,9 @@ public class UserGroupInformation {
/**
* Create a proxy user using username of the effective user and the ugi of the
* real user.
- *
- * @param effective
- * user, UGI for real user.
- * @return
+ * @param user
+ * @param realUser
+ * @return proxyUser ugi
*/
public static UserGroupInformation createProxyUser(String user,
UserGroupInformation realUser) {