You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/05/04 23:17:14 UTC

hadoop git commit: HADOOP-14207. "dfsadmin -refreshCallQueue" fails with DecayRpcScheduler. Contributed by Surendra Singh Lihore.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8.1 4e39090e4 -> 2a4196a92


HADOOP-14207. "dfsadmin -refreshCallQueue" fails with DecayRpcScheduler. Contributed by Surendra Singh Lihore.

(cherry picked from commit 918dee10f56861d15bb6644edcefb4246a3e00a4)
(cherry picked from commit fc0e6de929c34eae05349f50f0a266660c6c0be1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a4196a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a4196a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a4196a9

Branch: refs/heads/branch-2.8.1
Commit: 2a4196a921f09e3613bdde588fcb1a866f857300
Parents: 4e39090
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Apr 25 16:40:37 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu May 4 16:13:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/CallQueueManager.java |  1 +
 .../apache/hadoop/ipc/DecayRpcScheduler.java    | 31 ++++++-
 .../apache/hadoop/ipc/DefaultRpcScheduler.java  |  4 +
 .../org/apache/hadoop/ipc/RpcScheduler.java     |  2 +
 .../hdfs/server/namenode/NameNodeRpcServer.java |  2 +-
 .../org/apache/hadoop/TestRefreshCallQueue.java | 88 +++++++++++++-------
 6 files changed, 94 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 7e32470..ca86deb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -257,6 +257,7 @@ public class CallQueueManager<E> {
       Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
       String ns, Configuration conf) {
     int priorityLevels = parseNumLevels(ns, conf);
+    this.scheduler.stop();
     RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
         ns, conf);
     BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index f40bd17..537de37 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.management.ObjectName;
+
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDoubleArray;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -162,6 +164,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final String namespace;
   private final int topUsersCount; // e.g., report top 10 users' metrics
   private static final double PRECISION = 0.0001;
+  private MetricsProxy metricsProxy;
 
   /**
    * This TimerTask will call decayCurrentCounts until
@@ -230,9 +233,8 @@ public class DecayRpcScheduler implements RpcScheduler,
     DecayTask task = new DecayTask(this, timer);
     timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
 
-    MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
-    prox.setDelegate(this);
-    prox.registerMetrics2Source(ns);
+    metricsProxy = MetricsProxy.getInstance(ns, numLevels);
+    metricsProxy.setDelegate(this);
   }
 
   // Load configs
@@ -671,11 +673,14 @@ public class DecayRpcScheduler implements RpcScheduler,
     private WeakReference<DecayRpcScheduler> delegate;
     private double[] averageResponseTimeDefault;
     private long[] callCountInLastWindowDefault;
+    private ObjectName decayRpcSchedulerInfoBeanName;
 
     private MetricsProxy(String namespace, int numLevels) {
       averageResponseTimeDefault = new double[numLevels];
       callCountInLastWindowDefault = new long[numLevels];
-      MBeans.register(namespace, "DecayRpcScheduler", this);
+      decayRpcSchedulerInfoBeanName =
+          MBeans.register(namespace, "DecayRpcScheduler", this);
+      this.registerMetrics2Source(namespace);
     }
 
     public static synchronized MetricsProxy getInstance(String namespace,
@@ -689,6 +694,10 @@ public class DecayRpcScheduler implements RpcScheduler,
       return mp;
     }
 
+    public static synchronized void removeInstance(String namespace) {
+      MetricsProxy.INSTANCES.remove(namespace);
+    }
+
     public void setDelegate(DecayRpcScheduler obj) {
       this.delegate = new WeakReference<DecayRpcScheduler>(obj);
     }
@@ -698,6 +707,14 @@ public class DecayRpcScheduler implements RpcScheduler,
       DefaultMetricsSystem.instance().register(name, name, this);
     }
 
+    void unregisterSource(String namespace) {
+      final String name = "DecayRpcSchedulerMetrics2." + namespace;
+      DefaultMetricsSystem.instance().unregisterSource(name);
+      if (decayRpcSchedulerInfoBeanName != null) {
+        MBeans.unregister(decayRpcSchedulerInfoBeanName);
+      }
+    }
+
     @Override
     public String getSchedulingDecisionSummary() {
       DecayRpcScheduler scheduler = delegate.get();
@@ -921,4 +938,10 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
     return decayedCallCounts;
   }
+
+  @Override
+  public void stop() {
+    metricsProxy.unregisterSource(namespace);
+    MetricsProxy.removeInstance(namespace);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
index 08f74d4..0847af7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
@@ -42,4 +42,8 @@ public class DefaultRpcScheduler implements RpcScheduler {
   public DefaultRpcScheduler(int priorityLevels, String namespace,
       Configuration conf) {
   }
+
+  @Override
+  public void stop() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index 6f93b22..95c5a13 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -32,4 +32,6 @@ public interface RpcScheduler {
 
   void addResponseTime(String name, int priorityLevel, int queueTime,
       int processingTime);
+
+  void stop();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 03f77dc..9db51b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -504,7 +504,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
   /** Allow access to the client RPC server for testing */
   @VisibleForTesting
-  RPC.Server getClientRpcServer() {
+  public RPC.Server getClientRpcServer() {
     return clientRpcServer;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a4196a9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
index 5cb7def..d5eb9cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java
@@ -33,49 +33,42 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.ipc.FairCallQueue;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestRefreshCallQueue {
   private MiniDFSCluster cluster;
   private Configuration config;
-  private FileSystem fs;
   static int mockQueueConstructions;
   static int mockQueuePuts;
-  private String callQueueConfigKey = "";
-  private final Random rand = new Random();
+  private int nnPort = 0;
 
-  @Before
-  public void setUp() throws Exception {
-    // We want to count additional events, so we reset here
-    mockQueueConstructions = 0;
-    mockQueuePuts = 0;
+  private void setUp(Class<?> queueClass) throws IOException {
     int portRetries = 5;
-    int nnPort;
-
+    Random rand = new Random();
     for (; portRetries > 0; --portRetries) {
       // Pick a random port in the range [30000,60000).
-      nnPort = 30000 + rand.nextInt(30000);  
+      nnPort = 30000 + rand.nextInt(30000);
       config = new Configuration();
-      callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
-      config.setClass(callQueueConfigKey,
-          MockCallQueue.class, BlockingQueue.class);
+      String callQueueConfigKey = "ipc." + nnPort + ".callqueue.impl";
+      config.setClass(callQueueConfigKey, queueClass, BlockingQueue.class);
       config.set("hadoop.security.authorization", "true");
 
       FileSystem.setDefaultUri(config, "hdfs://localhost:" + nnPort);
-      fs = FileSystem.get(config);
-      
       try {
-        cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort).build();
+        cluster = new MiniDFSCluster.Builder(config).nameNodePort(nnPort)
+            .build();
         cluster.waitActive();
         break;
       } catch (BindException be) {
         // Retry with a different port number.
       }
     }
-    
     if (portRetries == 0) {
       // Bail if we get very unlucky with our choice of ports.
       fail("Failed to pick an ephemeral port for the NameNode RPC server.");
@@ -83,8 +76,8 @@ public class TestRefreshCallQueue {
   }
 
   @After
-  public void tearDown() throws Exception {
-    if(cluster!=null) {
+  public void tearDown() throws IOException {
+    if (cluster != null) {
       cluster.shutdown();
       cluster = null;
     }
@@ -105,29 +98,66 @@ public class TestRefreshCallQueue {
 
   // Returns true if mock queue was used for put
   public boolean canPutInMockQueue() throws IOException {
-  int putsBefore = mockQueuePuts;
-  fs.exists(new Path("/")); // Make an RPC call
-  return mockQueuePuts > putsBefore;
+    FileSystem fs = FileSystem.get(config);
+    int putsBefore = mockQueuePuts;
+    fs.exists(new Path("/")); // Make an RPC call
+    fs.close();
+    return mockQueuePuts > putsBefore;
   }
 
   @Test
   public void testRefresh() throws Exception {
-    assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0);
+    // We want to count additional events, so we reset here
+    mockQueueConstructions = 0;
+    mockQueuePuts = 0;
+    setUp(MockCallQueue.class);
+
+    assertTrue("Mock queue should have been constructed",
+        mockQueueConstructions > 0);
     assertTrue("Puts are routed through MockQueue", canPutInMockQueue());
     int lastMockQueueConstructions = mockQueueConstructions;
 
-    // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue
+    // Replace queue with the queue specified in core-site.xml, which would be
+    // the LinkedBlockingQueue
     DFSAdmin admin = new DFSAdmin(config);
     String [] args = new String[]{"-refreshCallQueue"};
     int exitCode = admin.run(args);
     assertEquals("DFSAdmin should return 0", 0, exitCode);
 
-    assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions);
+    assertEquals("Mock queue should have no additional constructions",
+        lastMockQueueConstructions, mockQueueConstructions);
     try {
-      assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue());
-    } catch (IOException ioe){
+      assertFalse("Puts are routed through LBQ instead of MockQueue",
+          canPutInMockQueue());
+    } catch (IOException ioe) {
       fail("Could not put into queue at all");
     }
   }
 
+  @Test
+  public void testRefreshCallQueueWithFairCallQueue() throws Exception {
+    setUp(FairCallQueue.class);
+    boolean oldValue = DefaultMetricsSystem.inMiniClusterMode();
+
+    // throw an error when we double-initialize JvmMetrics
+    DefaultMetricsSystem.setMiniClusterMode(false);
+
+    NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
+    try {
+      rpcServer.getClientRpcServer().refreshCallQueue(config);
+    } catch (Exception e) {
+      Throwable cause = e.getCause();
+      if ((cause instanceof MetricsException)
+          && cause.getMessage().contains(
+              "Metrics source DecayRpcSchedulerMetrics2.ipc." + nnPort
+                  + " already exists!")) {
+        fail("DecayRpcScheduler metrics should be unregistered before"
+            + " reregister");
+      }
+      throw e;
+    } finally {
+      DefaultMetricsSystem.setMiniClusterMode(oldValue);
+    }
+  }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org