You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/05/04 23:17:14 UTC
hadoop git commit: HADOOP-14207. "dfsadmin -refreshCallQueue" fails
with DecayRpcScheduler. Contributed by Surendra Singh Lihore.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8.1 4e39090e4 -> 2a4196a92
HADOOP-14207. "dfsadmin -refreshCallQueue" fails with DecayRpcScheduler. Contributed by Surendra Singh Lihore.
(cherry picked from commit 918dee10f56861d15bb6644edcefb4246a3e00a4)
(cherry picked from commit fc0e6de929c34eae05349f50f0a266660c6c0be1)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a4196a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a4196a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a4196a9
Branch: refs/heads/branch-2.8.1
Commit: 2a4196a921f09e3613bdde588fcb1a866f857300
Parents: 4e39090
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Apr 25 16:40:37 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu May 4 16:13:08 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/ipc/CallQueueManager.java | 1 +
.../apache/hadoop/ipc/DecayRpcScheduler.java | 31 ++++++-
.../apache/hadoop/ipc/DefaultRpcScheduler.java | 4 +
.../org/apache/hadoop/ipc/RpcScheduler.java | 2 +
.../hdfs/server/namenode/NameNodeRpcServer.java | 2 +-
.../org/apache/hadoop/TestRefreshCallQueue.java | 88 +++++++++++++-------
6 files changed, 94 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 7e32470..ca86deb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -257,6 +257,7 @@ public class CallQueueManager<E> {
Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
String ns, Configuration conf) {
int priorityLevels = parseNumLevels(ns, conf);
+ this.scheduler.stop();
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
ns, conf);
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index f40bd17..537de37 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
+import javax.management.ObjectName;
+
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDoubleArray;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -162,6 +164,7 @@ public class DecayRpcScheduler implements RpcScheduler,
private final String namespace;
private final int topUsersCount; // e.g., report top 10 users' metrics
private static final double PRECISION = 0.0001;
+ private MetricsProxy metricsProxy;
/**
* This TimerTask will call decayCurrentCounts until
@@ -230,9 +233,8 @@ public class DecayRpcScheduler implements RpcScheduler,
DecayTask task = new DecayTask(this, timer);
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
- MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
- prox.setDelegate(this);
- prox.registerMetrics2Source(ns);
+ metricsProxy = MetricsProxy.getInstance(ns, numLevels);
+ metricsProxy.setDelegate(this);
}
// Load configs
@@ -671,11 +673,14 @@ public class DecayRpcScheduler implements RpcScheduler,
private WeakReference<DecayRpcScheduler> delegate;
private double[] averageResponseTimeDefault;
private long[] callCountInLastWindowDefault;
+ private ObjectName decayRpcSchedulerInfoBeanName;
private MetricsProxy(String namespace, int numLevels) {
averageResponseTimeDefault = new double[numLevels];
callCountInLastWindowDefault = new long[numLevels];
- MBeans.register(namespace, "DecayRpcScheduler", this);
+ decayRpcSchedulerInfoBeanName =
+ MBeans.register(namespace, "DecayRpcScheduler", this);
+ this.registerMetrics2Source(namespace);
}
public static synchronized MetricsProxy getInstance(String namespace,
@@ -689,6 +694,10 @@ public class DecayRpcScheduler implements RpcScheduler,
return mp;
}
+ public static synchronized void removeInstance(String namespace) {
+ MetricsProxy.INSTANCES.remove(namespace);
+ }
+
public void setDelegate(DecayRpcScheduler obj) {
this.delegate = new WeakReference<DecayRpcScheduler>(obj);
}
@@ -698,6 +707,14 @@ public class DecayRpcScheduler implements RpcScheduler,
DefaultMetricsSystem.instance().register(name, name, this);
}
+ void unregisterSource(String namespace) {
+ final String name = "DecayRpcSchedulerMetrics2." + namespace;
+ DefaultMetricsSystem.instance().unregisterSource(name);
+ if (decayRpcSchedulerInfoBeanName != null) {
+ MBeans.unregister(decayRpcSchedulerInfoBeanName);
+ }
+ }
+
@Override
public String getSchedulingDecisionSummary() {
DecayRpcScheduler scheduler = delegate.get();
@@ -921,4 +938,10 @@ public class DecayRpcScheduler implements RpcScheduler,
}
return decayedCallCounts;
}
+
+ @Override
+ public void stop() {
+ metricsProxy.unregisterSource(namespace);
+ MetricsProxy.removeInstance(namespace);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
index 08f74d4..0847af7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
@@ -42,4 +42,8 @@ public class DefaultRpcScheduler implements RpcScheduler {
public DefaultRpcScheduler(int priorityLevels, String namespace,
Configuration conf) {
}
+
+ @Override
+ public void stop() {
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index 6f93b22..95c5a13 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -32,4 +32,6 @@ public interface RpcScheduler {
void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime);
+
+ void stop();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 03f77dc..9db51b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -504,7 +504,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
/** Allow access to the client RPC server for testing */
@VisibleForTesting
- RPC.Server getClientRpcServer() {
+ public RPC.Server getClientRpcServer() {
return clientRpcServer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
index 5cb7def..d5eb9cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
@@ -33,49 +33,42 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.ipc.FairCallQueue;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
public class TestRefreshCallQueue {
private MiniDFSCluster cluster;
private Configuration config;
- private FileSystem fs;
static int mockQueueConstructions;
static int mockQueuePuts;
- private String callQueueConfigKey = "";
- private final Random rand = new Random();
+ private int nnPort = 0;
- @Before
- public void setUp() throws Exception {
- // We want to count additional events, so we reset here
- mockQueueConstructions = 0;
- mockQueuePuts = 0;
+ private void setUp(Class<?> queueClass) throws IOException {
int portRetries = 5;
- int nnPort;
-
+ Random rand = new Random();
for (; portRetries > 0; --portRetries) {
// Pick a random port in the range [30000,60000).
- nnPort = 30000 + rand.nextInt(30000);
+ nnPort = 30000 + rand.nextInt(30000);
config = new Configuration();
- callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
- config.setClass(callQueueConfigKey,
- MockCallQueue.class, BlockingQueue.class);
+ String callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
+ config.setClass(callQueueConfigKey, queueClass, BlockingQueue.class);
config.set("hadoop.security.authorization", "true");
FileSystem.setDefaultUri(config, "hdfs://localhost:" + nnPort);
- fs = FileSystem.get(config);
-
try {
- cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort).build();
+ cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort)
+ .build();
cluster.waitActive();
break;
} catch (BindException be) {
// Retry with a different port number.
}
}
-
if (portRetries == 0) {
// Bail if we get very unlucky with our choice of ports.
fail("Failed to pick an ephemeral port for the NameNode RPC server.");
@@ -83,8 +76,8 @@ public class TestRefreshCallQueue {
}
@After
- public void tearDown() throws Exception {
- if(cluster!=null) {
+ public void tearDown() throws IOException {
+ if (cluster != null) {
cluster.shutdown();
cluster = null;
}
@@ -105,29 +98,66 @@ public class TestRefreshCallQueue {
// Returns true if mock queue was used for put
public boolean canPutInMockQueue() throws IOException {
- int putsBefore = mockQueuePuts;
- fs.exists(new Path("/")); // Make an RPC call
- return mockQueuePuts > putsBefore;
+ FileSystem fs = FileSystem.get(config);
+ int putsBefore = mockQueuePuts;
+ fs.exists(new Path("/")); // Make an RPC call
+ fs.close();
+ return mockQueuePuts > putsBefore;
}
@Test
public void testRefresh() throws Exception {
- assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0);
+ // We want to count additional events, so we reset here
+ mockQueueConstructions = 0;
+ mockQueuePuts = 0;
+ setUp(MockCallQueue.class);
+
+ assertTrue("Mock queue should have been constructed",
+ mockQueueConstructions > 0);
assertTrue("Puts are routed through MockQueue", canPutInMockQueue());
int lastMockQueueConstructions = mockQueueConstructions;
- // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue
+ // Replace queue with the queue specified in core-site.xml, which would be
+ // the LinkedBlockingQueue
DFSAdmin admin = new DFSAdmin(config);
String [] args = new String[]{"-refreshCallQueue"};
int exitCode = admin.run(args);
assertEquals("DFSAdmin should return 0", 0, exitCode);
- assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions);
+ assertEquals("Mock queue should have no additional constructions",
+ lastMockQueueConstructions, mockQueueConstructions);
try {
- assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue());
- } catch (IOException ioe){
+ assertFalse("Puts are routed through LBQ instead of MockQueue",
+ canPutInMockQueue());
+ } catch (IOException ioe) {
fail("Could not put into queue at all");
}
}
+ @Test
+ public void testRefreshCallQueueWithFairCallQueue() throws Exception {
+ setUp(FairCallQueue.class);
+ boolean oldValue = DefaultMetricsSystem.inMiniClusterMode();
+
+ // throw an error when we double-initialize JvmMetrics
+ DefaultMetricsSystem.setMiniClusterMode(false);
+
+ NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
+ try {
+ rpcServer.getClientRpcServer().refreshCallQueue(config);
+ } catch (Exception e) {
+ Throwable cause = e.getCause();
+ if ((cause instanceof MetricsException)
+ && cause.getMessage().contains(
+ "Metrics source DecayRpcSchedulerMetrics2.ipc." + nnPort
+ + " already exists!")) {
+ fail("DecayRpcScheduler metrics should be unregistered before"
+ + " reregister");
+ }
+ throw e;
+ } finally {
+ DefaultMetricsSystem.setMiniClusterMode(oldValue);
+ }
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org