You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/10/27 23:05:43 UTC

[1/3] hbase git commit: HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 61d913dd0 -> b1c24e1da
  refs/heads/branch-1.2 2b54a3541 -> 5faf604c0
  refs/heads/master dfa05284c -> d5d81d675


HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5d81d67
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5d81d67
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5d81d67

Branch: refs/heads/master
Commit: d5d81d675ace2d87c4ac19562b6b0a29da3d8902
Parents: dfa0528
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 27 14:24:21 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 27 14:59:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java  | 19 ++++---------------
 .../hadoop/hbase/ipc/FifoRpcScheduler.java       |  2 +-
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java |  3 +++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java   | 16 +++++++++++++++-
 .../apache/hadoop/hbase/ipc/TestCallRunner.java  |  2 ++
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java |  4 ++++
 6 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index ede4b4e..5b52521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -59,20 +58,22 @@ public class CallRunner {
     this.rpcServer = rpcServer;
     // Add size of the call to queue size.
     this.rpcServer.addCallSize(call.getSize());
-    this.status = getStatus();
   }
 
   public Call getCall() {
     return call;
   }
 
+  public void setStatus(MonitoredRPCHandler status) {
+    this.status = status;
+  }
+
   /**
    * Cleanup after ourselves... let go of references.
    */
   private void cleanup() {
     this.call = null;
     this.rpcServer = null;
-    this.status = null;
   }
 
   public void run() {
@@ -160,16 +161,4 @@ public class CallRunner {
       cleanup();
     }
   }
-
-  MonitoredRPCHandler getStatus() {
-    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
-    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
-    if (status != null) {
-      return status;
-    }
-    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
-    status.pause("Waiting for a call");
-    RpcServer.MONITORED_RPC.set(status);
-    return status;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 8140c1c..621a8ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.ipc.CallRunner;
 
 /**
  * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@@ -70,6 +69,7 @@ public class FifoRpcScheduler extends RpcScheduler {
     executor.submit(new Runnable() {
       @Override
       public void run() {
+        task.setStatus(RpcServer.getStatus());
         task.run();
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 709429d..27750a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -124,7 +125,9 @@ public abstract class RpcExecutor {
     try {
       while (running) {
         try {
+          MonitoredRPCHandler status = RpcServer.getStatus();
           CallRunner task = myQueue.take();
+          task.setStatus(status);
           try {
             activeHandlerCount.incrementAndGet();
             task.run();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index c20e972..6cbce95 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -2314,7 +2315,8 @@ public class RpcServer implements RpcServerInterface {
    * @param user client user
    * @param connection incoming connection
    * @param addr InetAddress of incoming connection
-   * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+   * @throws org.apache.hadoop.security.authorize.AuthorizationException
+   *         when the client isn't authorized to talk the protocol
    */
   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
   throws AuthorizationException {
@@ -2498,6 +2500,18 @@ public class RpcServer implements RpcServerInterface {
     return bsasi == null? null: bsasi.getBlockingService();
   }
 
+  static MonitoredRPCHandler getStatus() {
+    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
+    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
+    if (status != null) {
+      return status;
+    }
+    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
+    status.pause("Waiting for a call");
+    RpcServer.MONITORED_RPC.set(status);
+    return status;
+  }
+
   /** Returns the remote side ip address when invoked inside an RPC
    *  Returns null incase of an error.
    *  @return InetAddress

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index ec06ff3..47c15ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
@@ -35,6 +36,7 @@ public class TestCallRunner {
     RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
     mockCall.connection = Mockito.mock(RpcServer.Connection.class);
     CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+    cr.setStatus(new MonitoredRPCHandlerImpl());
     cr.run();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d5d81d67/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 2b7ffb2..087429a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -87,6 +88,7 @@ public class TestSimpleRpcScheduler {
     scheduler.init(CONTEXT);
     scheduler.start();
     CallRunner task = createMockTask();
+    task.setStatus(new MonitoredRPCHandlerImpl());
     scheduler.dispatch(task);
     verify(task, timeout(1000)).run();
     scheduler.stop();
@@ -121,6 +123,7 @@ public class TestSimpleRpcScheduler {
       }
     };
     for (CallRunner task : tasks) {
+      task.setStatus(new MonitoredRPCHandlerImpl());
       doAnswer(answerToRun).when(task).run();
     }
 
@@ -303,6 +306,7 @@ public class TestSimpleRpcScheduler {
 
   private void doAnswerTaskExecution(final CallRunner callTask,
       final ArrayList<Integer> results, final int value, final int sleepInterval) {
+    callTask.setStatus(new MonitoredRPCHandlerImpl());
     doAnswer(new Answer<Object>() {
       @Override
       public Object answer(InvocationOnMock invocation) {


[3/3] hbase git commit: HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

Posted by en...@apache.org.
HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b1c24e1d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b1c24e1d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b1c24e1d

Branch: refs/heads/branch-1.1
Commit: b1c24e1da9d29402a08789785f4edc1ab1a188a8
Parents: 61d913d
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 27 14:24:21 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 27 15:02:42 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java  | 19 ++++---------------
 .../hadoop/hbase/ipc/FifoRpcScheduler.java       |  2 +-
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java |  3 +++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java   | 16 +++++++++++++++-
 .../apache/hadoop/hbase/ipc/TestCallRunner.java  |  4 +++-
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java |  6 ++++++
 6 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e329ef0..35e51a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -54,20 +53,22 @@ public class CallRunner {
     this.rpcServer = rpcServer;
     // Add size of the call to queue size.
     this.rpcServer.addCallSize(call.getSize());
-    this.status = getStatus();
   }
 
   public Call getCall() {
     return call;
   }
 
+  public void setStatus(MonitoredRPCHandler status) {
+    this.status = status;
+  }
+
   /**
    * Cleanup after ourselves... let go of references.
    */
   private void cleanup() {
     this.call = null;
     this.rpcServer = null;
-    this.status = null;
   }
 
   public void run() {
@@ -150,16 +151,4 @@ public class CallRunner {
       cleanup();
     }
   }
-
-  MonitoredRPCHandler getStatus() {
-    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
-    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
-    if (status != null) {
-      return status;
-    }
-    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
-    status.pause("Waiting for a call");
-    RpcServer.MONITORED_RPC.set(status);
-    return status;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 8140c1c..621a8ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.ipc.CallRunner;
 
 /**
  * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@@ -70,6 +69,7 @@ public class FifoRpcScheduler extends RpcScheduler {
     executor.submit(new Runnable() {
       @Override
       public void run() {
+        task.setStatus(RpcServer.getStatus());
         task.run();
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 709429d..27750a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -124,7 +125,9 @@ public abstract class RpcExecutor {
     try {
       while (running) {
         try {
+          MonitoredRPCHandler status = RpcServer.getStatus();
           CallRunner task = myQueue.take();
+          task.setStatus(status);
           try {
             activeHandlerCount.incrementAndGet();
             task.run();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index e3ec22f..09c2a73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -2289,7 +2290,8 @@ public class RpcServer implements RpcServerInterface {
    * @param user client user
    * @param connection incoming connection
    * @param addr InetAddress of incoming connection
-   * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+   * @throws org.apache.hadoop.security.authorize.AuthorizationException
+   *         when the client isn't authorized to talk the protocol
    */
   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
   throws AuthorizationException {
@@ -2473,6 +2475,18 @@ public class RpcServer implements RpcServerInterface {
     return bsasi == null? null: bsasi.getBlockingService();
   }
 
+  static MonitoredRPCHandler getStatus() {
+    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
+    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
+    if (status != null) {
+      return status;
+    }
+    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
+    status.pause("Waiting for a call");
+    RpcServer.MONITORED_RPC.set(status);
+    return status;
+  }
+
   /** Returns the remote side ip address when invoked inside an RPC
    *  Returns null incase of an error.
    *  @return InetAddress

http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index 9a9d784..71ac3d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.junit.Test;
@@ -35,6 +36,7 @@ public class TestCallRunner {
     RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
     mockCall.connection = Mockito.mock(RpcServer.Connection.class);
     CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+    cr.setStatus(new MonitoredRPCHandlerImpl());
     cr.run();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b1c24e1d/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 3c50150..fc9e8a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -30,6 +30,9 @@ import com.google.protobuf.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
@@ -88,6 +91,7 @@ public class TestSimpleRpcScheduler {
     scheduler.init(CONTEXT);
     scheduler.start();
     CallRunner task = createMockTask();
+    task.setStatus(new MonitoredRPCHandlerImpl());
     scheduler.dispatch(task);
     verify(task, timeout(1000)).run();
     scheduler.stop();
@@ -122,6 +126,7 @@ public class TestSimpleRpcScheduler {
       }
     };
     for (CallRunner task : tasks) {
+      task.setStatus(new MonitoredRPCHandlerImpl());
       doAnswer(answerToRun).when(task).run();
     }
 
@@ -302,6 +307,7 @@ public class TestSimpleRpcScheduler {
 
   private void doAnswerTaskExecution(final CallRunner callTask,
       final ArrayList<Integer> results, final int value, final int sleepInterval) {
+    callTask.setStatus(new MonitoredRPCHandlerImpl());
     doAnswer(new Answer<Object>() {
       @Override
       public Object answer(InvocationOnMock invocation) {


[2/3] hbase git commit: HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

Posted by en...@apache.org.
HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5faf604c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5faf604c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5faf604c

Branch: refs/heads/branch-1.2
Commit: 5faf604c0b0d3d2a9598cf565c5b45b35458fd46
Parents: 2b54a35
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Oct 27 14:24:21 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Tue Oct 27 15:01:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java  | 19 ++++---------------
 .../hadoop/hbase/ipc/FifoRpcScheduler.java       |  2 +-
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java |  3 +++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java   | 16 +++++++++++++++-
 .../apache/hadoop/hbase/ipc/TestCallRunner.java  |  4 +++-
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java |  4 ++++
 6 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index ede4b4e..5b52521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -59,20 +58,22 @@ public class CallRunner {
     this.rpcServer = rpcServer;
     // Add size of the call to queue size.
     this.rpcServer.addCallSize(call.getSize());
-    this.status = getStatus();
   }
 
   public Call getCall() {
     return call;
   }
 
+  public void setStatus(MonitoredRPCHandler status) {
+    this.status = status;
+  }
+
   /**
    * Cleanup after ourselves... let go of references.
    */
   private void cleanup() {
     this.call = null;
     this.rpcServer = null;
-    this.status = null;
   }
 
   public void run() {
@@ -160,16 +161,4 @@ public class CallRunner {
       cleanup();
     }
   }
-
-  MonitoredRPCHandler getStatus() {
-    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
-    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
-    if (status != null) {
-      return status;
-    }
-    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
-    status.pause("Waiting for a call");
-    RpcServer.MONITORED_RPC.set(status);
-    return status;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 8140c1c..621a8ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.hbase.ipc.CallRunner;
 
 /**
  * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@@ -70,6 +69,7 @@ public class FifoRpcScheduler extends RpcScheduler {
     executor.submit(new Runnable() {
       @Override
       public void run() {
+        task.setStatus(RpcServer.getStatus());
         task.run();
       }
     });

http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 709429d..27750a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -124,7 +125,9 @@ public abstract class RpcExecutor {
     try {
       while (running) {
         try {
+          MonitoredRPCHandler status = RpcServer.getStatus();
           CallRunner task = myQueue.take();
+          task.setStatus(status);
           try {
             activeHandlerCount.incrementAndGet();
             task.run();

http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 186e2d2..289bbc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -2298,7 +2299,8 @@ public class RpcServer implements RpcServerInterface {
    * @param user client user
    * @param connection incoming connection
    * @param addr InetAddress of incoming connection
-   * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol
+   * @throws org.apache.hadoop.security.authorize.AuthorizationException
+   *         when the client isn't authorized to talk the protocol
    */
   public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
   throws AuthorizationException {
@@ -2482,6 +2484,18 @@ public class RpcServer implements RpcServerInterface {
     return bsasi == null? null: bsasi.getBlockingService();
   }
 
+  static MonitoredRPCHandler getStatus() {
+    // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
+    MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
+    if (status != null) {
+      return status;
+    }
+    status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
+    status.pause("Waiting for a call");
+    RpcServer.MONITORED_RPC.set(status);
+    return status;
+  }
+
   /** Returns the remote side ip address when invoked inside an RPC
    *  Returns null incase of an error.
    *  @return InetAddress

http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index b328e57..9691602 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -34,6 +35,7 @@ public class TestCallRunner {
     RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
     mockCall.connection = Mockito.mock(RpcServer.Connection.class);
     CallRunner cr = new CallRunner(mockRpcServer, mockCall);
+    cr.setStatus(new MonitoredRPCHandlerImpl());
     cr.run();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5faf604c/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 6645442..fa0727a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
@@ -86,6 +87,7 @@ public class TestSimpleRpcScheduler {
     scheduler.init(CONTEXT);
     scheduler.start();
     CallRunner task = createMockTask();
+    task.setStatus(new MonitoredRPCHandlerImpl());
     scheduler.dispatch(task);
     verify(task, timeout(1000)).run();
     scheduler.stop();
@@ -120,6 +122,7 @@ public class TestSimpleRpcScheduler {
       }
     };
     for (CallRunner task : tasks) {
+      task.setStatus(new MonitoredRPCHandlerImpl());
       doAnswer(answerToRun).when(task).run();
     }
 
@@ -302,6 +305,7 @@ public class TestSimpleRpcScheduler {
 
   private void doAnswerTaskExecution(final CallRunner callTask,
       final ArrayList<Integer> results, final int value, final int sleepInterval) {
+    callTask.setStatus(new MonitoredRPCHandlerImpl());
     doAnswer(new Answer<Object>() {
       @Override
       public Object answer(InvocationOnMock invocation) {