You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2014/03/14 22:19:29 UTC
svn commit: r1577720 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./
src/ src/main/java/ src/main/java/org/apache/hadoop/ipc/
src/test/java/org/apache/hadoop/ipc/
Author: szetszwo
Date: Fri Mar 14 21:19:29 2014
New Revision: 1577720
URL: http://svn.apache.org/r1577720
Log:
svn merge -c 1577710 from trunk for HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1577710
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1577720&r1=1577719&r2=1577720&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri Mar 14 21:19:29 2014
@@ -120,6 +120,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10337 ConcurrentModificationException from
MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack)
+ HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.
+ (szetszwo)
+
BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS
HADOOP-10185. FileSystem API for ACLs. (cnauroth)
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1577710
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1577710
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1577710
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java?rev=1577720&r1=1577719&r2=1577720&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java Fri Mar 14 21:19:29 2014
@@ -18,15 +18,13 @@
package org.apache.hadoop.ipc;
+import java.lang.reflect.Constructor;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
-
-import java.lang.reflect.Constructor;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
/**
@@ -35,13 +33,19 @@ import org.apache.hadoop.conf.Configurat
public class CallQueueManager<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
+ @SuppressWarnings("unchecked")
+ static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
+ Class<?> queneClass, Class<E> elementClass) {
+ return (Class<? extends BlockingQueue<E>>)queneClass;
+ }
+
// Atomic refs point to active callQueue
// We have two so we can better control swapping
private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef;
- public CallQueueManager(Class backingClass, int maxQueueSize,
- String namespace, Configuration conf) {
+ public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
+ int maxQueueSize, String namespace, Configuration conf) {
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf);
this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
@@ -49,15 +53,14 @@ public class CallQueueManager<E> {
LOG.info("Using callQueue " + backingClass);
}
- @SuppressWarnings("unchecked")
- private BlockingQueue<E> createCallQueueInstance(Class theClass, int maxLen,
- String ns, Configuration conf) {
+ private <T extends BlockingQueue<E>> T createCallQueueInstance(
+ Class<T> theClass, int maxLen, String ns, Configuration conf) {
// Used for custom, configurable callqueues
try {
- Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class,
+ Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
Configuration.class);
- return (BlockingQueue<E>)ctor.newInstance(maxLen, ns, conf);
+ return ctor.newInstance(maxLen, ns, conf);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -65,8 +68,8 @@ public class CallQueueManager<E> {
// Used for LinkedBlockingQueue, ArrayBlockingQueue, etc
try {
- Constructor ctor = theClass.getDeclaredConstructor(int.class);
- return (BlockingQueue<E>)ctor.newInstance(maxLen);
+ Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
+ return ctor.newInstance(maxLen);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -74,8 +77,8 @@ public class CallQueueManager<E> {
// Last attempt
try {
- Constructor ctor = theClass.getDeclaredConstructor();
- return (BlockingQueue<E>)ctor.newInstance();
+ Constructor<T> ctor = theClass.getDeclaredConstructor();
+ return ctor.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -117,8 +120,9 @@ public class CallQueueManager<E> {
* Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning.
*/
- public synchronized void swapQueue(Class queueClassToUse, int maxSize,
- String ns, Configuration conf) {
+ public synchronized void swapQueue(
+ Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
+ String ns, Configuration conf) {
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
ns, conf);
@@ -143,7 +147,7 @@ public class CallQueueManager<E> {
* This doesn't mean the queue might not fill up at some point later, but
* it should decrease the probability that we lose a call this way.
*/
- private boolean queueIsReallyEmpty(BlockingQueue q) {
+ private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
boolean wasEmpty = q.isEmpty();
try {
Thread.sleep(10);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1577720&r1=1577719&r2=1577720&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Mar 14 21:19:29 2014
@@ -541,7 +541,7 @@ public class Client {
}
private synchronized AuthMethod setupSaslConnection(final InputStream in2,
- final OutputStream out2) throws IOException, InterruptedException {
+ final OutputStream out2) throws IOException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the
// Client object is cached and shared between all RPC clients, even those
// for separate services.
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1577720&r1=1577719&r2=1577720&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Mar 14 21:19:29 2014
@@ -18,6 +18,11 @@
package org.apache.hadoop.ipc;
+import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -75,8 +80,6 @@ import org.apache.hadoop.fs.CommonConfig
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.*;
-
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
@@ -467,17 +470,24 @@ public abstract class Server {
return serviceAuthorizationManager;
}
+ static Class<? extends BlockingQueue<Call>> getQueueClass(
+ String prefix, Configuration conf) {
+ String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
+ Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
+ return CallQueueManager.convertQueueClass(queueClass, Call.class);
+ }
+
+ private String getQueueClassPrefix() {
+ return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
+ }
+
/*
* Refresh the call queue
*/
public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue
- String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
- this.port;
- Class queueClassToUse = conf.getClass(prefix + "." +
- CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
-
- callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
+ String prefix = getQueueClassPrefix();
+ callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
}
/** A call queued for handling. */
@@ -1225,9 +1235,9 @@ public abstract class Server {
Throwable cause = e;
while (cause != null) {
if (cause instanceof RetriableException) {
- return (RetriableException) cause;
+ return cause;
} else if (cause instanceof StandbyException) {
- return (StandbyException) cause;
+ return cause;
} else if (cause instanceof InvalidToken) {
// FIXME: hadoop method signatures are restricting the SASL
// callbacks to only returning InvalidToken, but some services
@@ -1297,7 +1307,7 @@ public abstract class Server {
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException {
- RpcSaslProto saslResponse = null;
+ final RpcSaslProto saslResponse;
final SaslState state = saslMessage.getState(); // required
switch (state) {
case NEGOTIATE: {
@@ -1333,27 +1343,18 @@ public abstract class Server {
// SIMPLE is a legit option above. we will send no response
if (authMethod == AuthMethod.SIMPLE) {
switchToSimple();
+ saslResponse = null;
break;
}
// sasl server for tokens may already be instantiated
if (saslServer == null || authMethod != AuthMethod.TOKEN) {
saslServer = createSaslServer(authMethod);
}
- // fallthru to process sasl token
+ saslResponse = processSaslToken(saslMessage);
+ break;
}
case RESPONSE: {
- if (!saslMessage.hasToken()) {
- throw new SaslException("Client did not send a token");
- }
- byte[] saslToken = saslMessage.getToken().toByteArray();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Have read input token of size " + saslToken.length
- + " for processing by saslServer.evaluateResponse()");
- }
- saslToken = saslServer.evaluateResponse(saslToken);
- saslResponse = buildSaslResponse(
- saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
- saslToken);
+ saslResponse = processSaslToken(saslMessage);
break;
}
default:
@@ -1362,6 +1363,22 @@ public abstract class Server {
return saslResponse;
}
+ private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
+ throws SaslException {
+ if (!saslMessage.hasToken()) {
+ throw new SaslException("Client did not send a token");
+ }
+ byte[] saslToken = saslMessage.getToken().toByteArray();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Have read input token of size " + saslToken.length
+ + " for processing by saslServer.evaluateResponse()");
+ }
+ saslToken = saslServer.evaluateResponse(saslToken);
+ return buildSaslResponse(
+ saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
+ saslToken);
+ }
+
private void switchToSimple() {
// disable SASL and blank out any SASL server
authProtocol = AuthProtocol.NONE;
@@ -2123,12 +2140,9 @@ public abstract class Server {
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
// Setup appropriate callqueue
- String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
- this.port;
- Class queueClassToUse = conf.getClass(prefix + "." +
- CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
- this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
- prefix, conf);
+ final String prefix = getQueueClassPrefix();
+ this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
+ maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java?rev=1577720&r1=1577719&r2=1577720&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java Fri Mar 14 21:19:29 2014
@@ -18,22 +18,15 @@
package org.apache.hadoop.ipc;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.HashMap;
-import java.util.ArrayList;
-
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import org.junit.Assert;
-import org.junit.Assume;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager;
@@ -146,23 +139,26 @@ public class TestCallQueueManager {
}
+ private static final Class<? extends BlockingQueue<FakeCall>> queueClass
+ = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
+
@Test
public void testCallQueueCapacity() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
}
@Test
public void testEmptyConsume() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
assertCanTake(manager, 0, 1); // Fails since it's empty
}
@Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException {
- manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 5000, "", null);
+ manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>();
@@ -191,7 +187,7 @@ public class TestCallQueueManager {
Thread.sleep(10);
for (int i=0; i < 5; i++) {
- manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null);
+ manager.swapQueue(queueClass, 5000, "", null);
}
// Stop the producers