You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2013/12/19 03:04:01 UTC
svn commit: r1552205 - in
/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common: ./
src/main/docs/ src/main/java/ src/main/java/org/apache/hadoop/fs/
src/main/java/org/apache/hadoop/io/retry/
src/main/java/org/apache/hadoop/ipc/ src/mai...
Author: arp
Date: Thu Dec 19 02:03:47 2013
New Revision: 1552205
URL: http://svn.apache.org/r1552205
Log:
Merge forward from trunk to branch HDFS-2832
Modified:
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/docs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/core/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt Thu Dec 19 02:03:47 2013
@@ -280,6 +280,8 @@ Trunk (Unreleased)
HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
all pools (Andrew Wang via Colin Patrick McCabe)
+ HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
+
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@@ -395,12 +397,16 @@ Release 2.4.0 - UNRELEASED
HADOOP-10102. Update commons IO from 2.1 to 2.4 (Akira Ajisaka via stevel)
+ HADOOP-10168. fix javadoc of ReflectionUtils#copy. (Thejas Nair via suresh)
+
+ HADOOP-10164. Allow UGI to login with a known Subject (bobby)
+
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
- HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
- via acmurthy)
+ HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
+ via acmurthy)
BUG FIXES
@@ -465,6 +471,16 @@ Release 2.4.0 - UNRELEASED
HADOOP-10058. TestMetricsSystemImpl#testInitFirstVerifyStopInvokedImmediately
fails on trunk (Chen He via jeagles)
+ HADOOP-8753. LocalDirAllocator throws "ArithmeticException: / by zero" when
+ there is no available space on configured local dir. (Benoy Antony via hitesh)
+
+ HADOOP-10106. Incorrect thread name in RPC log messages. (Ming Ma via jing9)
+
+ HADOOP-9611 mvn-rpmbuild against google-guice > 3.0 yields missing cglib
+ dependency (Timothy St. Clair via stevel)
+
+ HADOOP-10171. TestRPC fails intermittently on jkd7 (Mit Desai via jeagles)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt:r1551915
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1550313-1552204
Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1535792-1536571,1536573-1552204
Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1549949-1552204
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java Thu Dec 19 02:03:47 2013
@@ -365,6 +365,10 @@ public class LocalDirAllocator {
totalAvailable += availableOnDisk[i];
}
+ if (totalAvailable == 0){
+ throw new DiskErrorException("No space available in any of the local directories.");
+ }
+
// Keep rolling the wheel till we get a valid path
Random r = new java.util.Random();
while (numDirsSearched < numDirs && returnPath == null) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java Thu Dec 19 02:03:47 2013
@@ -68,7 +68,14 @@ public class RetryPolicies {
* </p>
*/
public static final RetryPolicy RETRY_FOREVER = new RetryForever();
-
+
+ /**
+ * <p>
+ * Keep failing over forever
+ * </p>
+ */
+ public static final RetryPolicy FAILOVER_FOREVER = new FailoverForever();
+
/**
* <p>
* Keep trying a limited number of times, waiting a fixed time between attempts,
@@ -166,6 +173,14 @@ public class RetryPolicies {
return RetryAction.RETRY;
}
}
+
+ static class FailoverForever implements RetryPolicy {
+ @Override
+ public RetryAction shouldRetry(Exception e, int retries, int failovers,
+ boolean isIdempotentOrAtMostOnce) throws Exception {
+ return RetryAction.FAILOVER_AND_RETRY;
+ }
+ }
/**
* Retry up to maxRetries.
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java Thu Dec 19 02:03:47 2013
@@ -37,10 +37,24 @@ public class RpcConstants {
public static final int INVALID_RETRY_COUNT = -1;
+ /**
+ * The Rpc-connection header is as follows
+ * +----------------------------------+
+ * | "hrpc" 4 bytes |
+ * +----------------------------------+
+ * | Version (1 byte) |
+ * +----------------------------------+
+ * | Service Class (1 byte) |
+ * +----------------------------------+
+ * | AuthProtocol (1 byte) |
+ * +----------------------------------+
+ */
+
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+ public static final int HEADER_LEN_AFTER_HRPC_PART = 3; // 3 bytes that follow
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Dec 19 02:03:47 2013
@@ -551,14 +551,14 @@ public abstract class Server {
@Override
public void run() {
- LOG.info("Starting " + getName());
+ LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
- LOG.error("Error closing read selector in " + this.getName(), ioe);
+ LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
}
}
}
@@ -589,7 +589,7 @@ public abstract class Server {
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " unexpectedly interrupted", e);
+ LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
@@ -620,7 +620,7 @@ public abstract class Server {
@Override
public void run() {
- LOG.info(getName() + ": starting");
+ LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
@@ -652,7 +652,7 @@ public abstract class Server {
closeCurrentConnection(key, e);
}
}
- LOG.info("Stopping " + this.getName());
+ LOG.info("Stopping " + Thread.currentThread().getName());
synchronized (this) {
try {
@@ -710,14 +710,14 @@ public abstract class Server {
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
- LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
+ LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
// a WrappedRpcServerException is an exception that has been sent
// to the client, so the stacktrace is unnecessary; any other
// exceptions are unexpected internal server errors and thus the
// stacktrace should be logged
- LOG.info(getName() + ": readAndProcess from client " +
+ LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
c.getHostAddress() + " threw exception [" + e + "]",
(e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
@@ -740,7 +740,7 @@ public abstract class Server {
try {
acceptChannel.socket().close();
} catch (IOException e) {
- LOG.info(getName() + ":Exception in closing listener socket. " + e);
+ LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket. " + e);
}
}
for (Reader r : readers) {
@@ -773,16 +773,16 @@ public abstract class Server {
@Override
public void run() {
- LOG.info(getName() + ": starting");
+ LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
doRunLoop();
} finally {
- LOG.info("Stopping " + this.getName());
+ LOG.info("Stopping " + Thread.currentThread().getName());
try {
writeSelector.close();
} catch (IOException ioe) {
- LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+ LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
}
}
}
@@ -803,7 +803,7 @@ public abstract class Server {
doAsyncWrite(key);
}
} catch (IOException e) {
- LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+ LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = Time.now();
@@ -918,7 +918,7 @@ public abstract class Server {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to " + call);
+ LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
@@ -937,7 +937,7 @@ public abstract class Server {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to " + call
+ LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
@@ -965,7 +965,7 @@ public abstract class Server {
}
}
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": responding to " + call
+ LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
}
@@ -973,7 +973,7 @@ public abstract class Server {
}
} finally {
if (error && call != null) {
- LOG.warn(getName()+", call " + call + ": output error");
+ LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
@@ -1105,6 +1105,9 @@ public abstract class Server {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
+
+ // the buffer is initialized to read the "hrpc" and after that to read
+ // the length of the Rpc-packet (i.e 4 bytes)
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.unwrappedData = null;
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
@@ -1200,7 +1203,16 @@ public abstract class Server {
}
}
- private Throwable getCauseForInvalidToken(IOException e) {
+ /**
+ * Some exceptions ({@link RetriableException} and {@link StandbyException})
+ * that are wrapped as a cause of parameter e are unwrapped so that they can
+ * be sent as the true cause to the client side. In case of
+ * {@link InvalidToken} we go one level deeper to get the true cause.
+ *
+ * @param e the exception that may have a cause we want to unwrap.
+ * @return the true cause for some exceptions.
+ */
+ private Throwable getTrueCause(IOException e) {
Throwable cause = e;
while (cause != null) {
if (cause instanceof RetriableException) {
@@ -1223,6 +1235,18 @@ public abstract class Server {
return e;
}
+ /**
+ * Process saslMessage and send saslResponse back
+ * @param saslMessage received SASL message
+ * @throws WrappedRpcServerException setup failed due to SASL negotiation
+ * failure, premature or invalid connection context, or other state
+ * errors. This exception needs to be sent to the client. This
+ * exception will wrap {@link RetriableException},
+ * {@link InvalidToken}, {@link StandbyException} or
+ * {@link SaslException}.
+ * @throws IOException if sending reply fails
+ * @throws InterruptedException
+ */
private void saslProcess(RpcSaslProto saslMessage)
throws WrappedRpcServerException, IOException, InterruptedException {
if (saslContextEstablished) {
@@ -1239,7 +1263,7 @@ public abstract class Server {
// attempting user could be null
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
+ attemptingUser + " (" + e.getLocalizedMessage() + ")");
- throw (IOException) getCauseForInvalidToken(e);
+ throw (IOException) getTrueCause(e);
}
if (saslServer != null && saslServer.isComplete()) {
@@ -1274,13 +1298,26 @@ public abstract class Server {
}
}
+ /**
+ * Process a saslMessge.
+ * @param saslMessage received SASL message
+ * @return the sasl response to send back to client
+ * @throws SaslException if authentication or generating response fails,
+ * or SASL protocol mixup
+ * @throws IOException if a SaslServer cannot be created
+ * @throws AccessControlException if the requested authentication type
+ * is not supported or trying to re-attempt negotiation.
+ * @throws InterruptedException
+ */
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
- throws IOException, InterruptedException {
+ throws SaslException, IOException, AccessControlException,
+ InterruptedException {
RpcSaslProto saslResponse = null;
final SaslState state = saslMessage.getState(); // required
switch (state) {
case NEGOTIATE: {
if (sentNegotiate) {
+ // FIXME shouldn't this be SaslException?
throw new AccessControlException(
"Client already attempted negotiation");
}
@@ -1402,12 +1439,30 @@ public abstract class Server {
}
}
+ /**
+ * This method reads in a non-blocking fashion from the channel:
+ * this method is called repeatedly when data is present in the channel;
+ * when it has enough data to process one rpc it processes that rpc.
+ *
+ * On the first pass, it processes the connectionHeader,
+ * connectionContext (an outOfBand RPC) and at most one RPC request that
+ * follows that. On future passes it will process at most one RPC request.
+ *
+ * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR
+ * rpc request length.
+ *
+ * @return -1 in case of error, else num bytes read so far
+ * @throws WrappedRpcServerException - an exception that has already been
+ * sent back to the client that does not require verbose logging
+ * by the Listener thread
+ * @throws IOException - internal error that should not be returned to
+ * client, typically failure to respond to client
+ * @throws InterruptedException
+ */
public int readAndProcess()
throws WrappedRpcServerException, IOException, InterruptedException {
while (true) {
- /* Read at most one RPC. If the header is not read completely yet
- * then iterate until we read first RPC or until there is no data left.
- */
+ // dataLengthBuffer is used to read "hrpc" or the rpc-packet length
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
@@ -1416,9 +1471,11 @@ public abstract class Server {
}
if (!connectionHeaderRead) {
- //Every connection is expected to send the header.
+ // Every connection is expected to send the header;
+ // so far we read "hrpc" of the connection header.
if (connectionHeaderBuf == null) {
- connectionHeaderBuf = ByteBuffer.allocate(3);
+ // for the bytes that follow "hrpc", in the connection header
+ connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
}
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
@@ -1451,27 +1508,30 @@ public abstract class Server {
// this may switch us into SIMPLE
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
- dataLengthBuffer.clear();
+ dataLengthBuffer.clear(); // clear to next read rpc packet len
connectionHeaderBuf = null;
connectionHeaderRead = true;
- continue;
+ continue; // connection header read, now read 4 bytes rpc packet len
}
- if (data == null) {
+ if (data == null) { // just read 4 bytes - length of RPC packet
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
checkDataLength(dataLength);
+ // Set buffer for reading EXACTLY the RPC-packet length and no more.
data = ByteBuffer.allocate(dataLength);
}
-
+ // Now read the RPC packet
count = channelRead(channel, data);
if (data.remaining() == 0) {
- dataLengthBuffer.clear();
+ dataLengthBuffer.clear(); // to read length of future rpc packets
data.flip();
boolean isHeaderRead = connectionContextRead;
processOneRpc(data.array());
data = null;
+ // the last rpc-request we processed could have simply been the
+ // connectionContext; if so continue to read the first RPC.
if (!isHeaderRead) {
continue;
}
@@ -1508,8 +1568,16 @@ public abstract class Server {
return authProtocol;
}
+ /**
+ * Process the Sasl's Negotiate request, including the optimization of
+ * accelerating token negotiation.
+ * @return the response to Negotiate request - the list of enabled
+ * authMethods and challenge if the TOKENS are supported.
+ * @throws SaslException - if attempt to generate challenge fails.
+ * @throws IOException - if it fails to create the SASL server for Tokens
+ */
private RpcSaslProto buildSaslNegotiateResponse()
- throws IOException, InterruptedException {
+ throws InterruptedException, SaslException, IOException {
RpcSaslProto negotiateMessage = negotiateResponse;
// accelerate token negotiation by sending initial challenge
// in the negotiation response
@@ -1635,8 +1703,11 @@ public abstract class Server {
/**
* Process a wrapped RPC Request - unwrap the SASL packet and process
* each embedded RPC request
- * @param buf - SASL wrapped request of one or more RPCs
+ * @param inBuf - SASL wrapped request of one or more RPCs
* @throws IOException - SASL packet cannot be unwrapped
+ * @throws WrappedRpcServerException - an exception that has already been
+ * sent back to the client that does not require verbose logging
+ * by the Listener thread
* @throws InterruptedException
*/
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
@@ -1677,13 +1748,21 @@ public abstract class Server {
}
/**
- * Process an RPC Request - handle connection setup and decoding of
- * request into a Call
+ * Process one RPC Request from buffer read from socket stream
+ * - decode rpc in a rpc-Call
+ * - handle out-of-band RPC requests such as the initial connectionContext
+ * - A successfully decoded RpcCall will be deposited in RPC-Q and
+ * its response will be sent later when the request is processed.
+ *
+ * Prior to this call the connectionHeader ("hrpc...") has been handled and
+ * if SASL then SASL has been established and the buf we are passed
+ * has been unwrapped from SASL.
+ *
* @param buf - contains the RPC request header and the rpc request
* @throws IOException - internal error that should not be returned to
* client, typically failure to respond to client
- * @throws WrappedRpcServerException - an exception to be sent back to
- * the client that does not require verbose logging by the
+ * @throws WrappedRpcServerException - an exception that is sent back to the
+ * client in this method and does not require verbose logging by the
* Listener thread
* @throws InterruptedException
*/
@@ -1753,8 +1832,11 @@ public abstract class Server {
}
/**
- * Process an RPC Request - the connection headers and context must
- * have been already read
+ * Process an RPC Request
+ * - the connection headers and context must have been already read.
+ * - Based on the rpcKind, decode the rpcRequest.
+ * - A successfully decoded RpcCall will be deposited in RPC-Q and
+ * its response will be sent later when the request is processed.
* @param header - RPC request header
* @param dis - stream to request payload
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
@@ -1803,7 +1885,8 @@ public abstract class Server {
* @param dis - stream to request payload
* @throws WrappedRpcServerException - setup failed due to SASL
* negotiation failure, premature or invalid connection context,
- * or other state errors
+ * or other state errors. This exception needs to be sent to the
+ * client.
* @throws IOException - failed to send a response back to the client
* @throws InterruptedException
*/
@@ -1928,7 +2011,7 @@ public abstract class Server {
@Override
public void run() {
- LOG.debug(getName() + ": starting");
+ LOG.debug(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
@@ -1936,7 +2019,7 @@ public abstract class Server {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
+ LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
String errorClass = null;
String error = null;
@@ -1969,7 +2052,7 @@ public abstract class Server {
if (e instanceof UndeclaredThrowableException) {
e = e.getCause();
}
- String logMsg = getName() + ", call " + call + ": error: " + e;
+ String logMsg = Thread.currentThread().getName() + ", call " + call + ": error: " + e;
if (e instanceof RuntimeException || e instanceof Error) {
// These exception types indicate something is probably wrong
// on the server side, as opposed to just a normal exceptional
@@ -2018,13 +2101,13 @@ public abstract class Server {
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
- LOG.info(getName() + " unexpectedly interrupted", e);
+ LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (Exception e) {
- LOG.info(getName() + " caught an exception", e);
+ LOG.info(Thread.currentThread().getName() + " caught an exception", e);
}
}
- LOG.debug(getName() + ": exiting");
+ LOG.debug(Thread.currentThread().getName() + ": exiting");
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java Thu Dec 19 02:03:47 2013
@@ -477,7 +477,7 @@ public class UserGroupInformation {
private static final AppConfigurationEntry[] SIMPLE_CONF =
new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN, HADOOP_LOGIN};
-
+
private static final AppConfigurationEntry[] USER_KERBEROS_CONF =
new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN, USER_KERBEROS_LOGIN,
HADOOP_LOGIN};
@@ -682,44 +682,59 @@ public class UserGroupInformation {
public synchronized
static UserGroupInformation getLoginUser() throws IOException {
if (loginUser == null) {
- ensureInitialized();
- try {
- Subject subject = new Subject();
- LoginContext login =
- newLoginContext(authenticationMethod.getLoginAppName(),
- subject, new HadoopConfiguration());
- login.login();
- UserGroupInformation realUser = new UserGroupInformation(subject);
- realUser.setLogin(login);
- realUser.setAuthenticationMethod(authenticationMethod);
- realUser = new UserGroupInformation(login.getSubject());
- // If the HADOOP_PROXY_USER environment variable or property
- // is specified, create a proxy user as the logged in user.
- String proxyUser = System.getenv(HADOOP_PROXY_USER);
- if (proxyUser == null) {
- proxyUser = System.getProperty(HADOOP_PROXY_USER);
- }
- loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
-
- String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
- if (fileLocation != null) {
- // Load the token storage file and put all of the tokens into the
- // user. Don't use the FileSystem API for reading since it has a lock
- // cycle (HADOOP-9212).
- Credentials cred = Credentials.readTokenStorageFile(
- new File(fileLocation), conf);
- loginUser.addCredentials(cred);
- }
- loginUser.spawnAutoRenewalThreadForUserCreds();
- } catch (LoginException le) {
- LOG.debug("failure to login", le);
- throw new IOException("failure to login", le);
+ loginUserFromSubject(null);
+ }
+ return loginUser;
+ }
+
+ /**
+ * Log in a user using the given subject
+ * @parma subject the subject to use when logging in a user, or null to
+ * create a new subject.
+ * @throws IOException if login fails
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public synchronized
+ static void loginUserFromSubject(Subject subject) throws IOException {
+ ensureInitialized();
+ try {
+ if (subject == null) {
+ subject = new Subject();
+ }
+ LoginContext login =
+ newLoginContext(authenticationMethod.getLoginAppName(),
+ subject, new HadoopConfiguration());
+ login.login();
+ UserGroupInformation realUser = new UserGroupInformation(subject);
+ realUser.setLogin(login);
+ realUser.setAuthenticationMethod(authenticationMethod);
+ realUser = new UserGroupInformation(login.getSubject());
+ // If the HADOOP_PROXY_USER environment variable or property
+ // is specified, create a proxy user as the logged in user.
+ String proxyUser = System.getenv(HADOOP_PROXY_USER);
+ if (proxyUser == null) {
+ proxyUser = System.getProperty(HADOOP_PROXY_USER);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("UGI loginUser:"+loginUser);
+ loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
+
+ String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ if (fileLocation != null) {
+ // Load the token storage file and put all of the tokens into the
+ // user. Don't use the FileSystem API for reading since it has a lock
+ // cycle (HADOOP-9212).
+ Credentials cred = Credentials.readTokenStorageFile(
+ new File(fileLocation), conf);
+ loginUser.addCredentials(cred);
}
+ loginUser.spawnAutoRenewalThreadForUserCreds();
+ } catch (LoginException le) {
+ LOG.debug("failure to login", le);
+ throw new IOException("failure to login", le);
}
- return loginUser;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("UGI loginUser:"+loginUser);
+ }
}
@InterfaceAudience.Private
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java Thu Dec 19 02:03:47 2013
@@ -275,8 +275,9 @@ public class ReflectionUtils {
/**
* Make a copy of the writable object using serialization to a buffer
- * @param dst the object to copy from
- * @param src the object to copy into, which is destroyed
+ * @param src the object to copy from
+ * @param dst the object to copy into, which is destroyed
+ * @return dst param (the copy)
* @throws IOException
*/
@SuppressWarnings("unchecked")
Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:r1535792-1536571,1536573-1552204
Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Thu Dec 19 02:03:47 2013
@@ -957,6 +957,7 @@ public class TestRPC {
proxy.sleep(pingInterval*4);
} finally {
if (proxy != null) RPC.stopProxy(proxy);
+ server.stop();
}
}