You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:44:48 UTC

svn commit: r1181968 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/monitoring/ main/resources/hbase-webapps/static/ main/resources/hbase-webapps/taskmonitor/ test/java/org/apache/hadoop/hbase/mo...

Author: nspiegelberg
Date: Tue Oct 11 17:44:47 2011
New Revision: 1181968

URL: http://svn.apache.org/viewvc?rev=1181968&view=rev
Log:
Build Show Processlist for HBase

Summary:
This is an improved version of D291214, but diffed against trunk now that the
open source port has been committed.

Each RPC handler claims a single MonitoredRPCHandler, keeping it updated with
whether it's in a waiting state or actively servicing an RPC, and if so, serves
details about the RPC method being called.

Test Plan: Ran all unit tests, have been doing a lot of load testing on
Nicolas's dev cluster, about to start dark launch tests.

Reviewers: nspiegelberg

Reviewed By: nspiegelberg

CC: hbase@lists, nspiegelberg, riley, bhiller

Differential Revision: 300129

Task ID: 620988

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
    hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
    hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Oct 11 17:44:47 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -592,7 +593,8 @@ public class HBaseRPC {
     }
 
     @Override
-    public Writable call(Writable param, long receivedTime) throws IOException {
+    public Writable call(Writable param, long receivedTime,
+        MonitoredRPCHandler status) throws IOException {
       try {
         Invocation call = (Invocation)param;
         if(call.getMethodName() == null) {
@@ -600,6 +602,9 @@ public class HBaseRPC {
               "cause is a version mismatch between client and server.");
         }
         if (verbose) log("Call: " + call);
+        status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
+        status.setRPCPacket(param);
+        status.resume("Servicing call");
         Method method =
           implementation.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
@@ -631,7 +636,8 @@ public class HBaseRPC {
           // when tagging, we let TooLarge trump TooSmall to keep output simple
           // note that large responses will often also be slow.
           logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
-              startTime, processingTime, qTime, responseSize);
+              status.getClient(), startTime, processingTime, qTime,
+              responseSize);
           if (tooSlow) {
             // increment global slow RPC response counter
             rpcMetrics.inc("slowResponse.", processingTime);
@@ -666,13 +672,14 @@ public class HBaseRPC {
      * client Operations.
      * @param call The call to log.
      * @param tag  The tag that will be used to indicate this event in the log.
+     * @param client          The address of the client who made this call.
      * @param startTime       The time that the call was initiated, in ms.
      * @param processingTime  The duration that the call took to run, in ms.
      * @param qTime           The duration that the call spent on the queue
      *                        prior to being initiated, in ms.
      * @param responseSize    The size in bytes of the response buffer.
      */
-    private void logResponse(Invocation call, String tag,
+    private void logResponse(Invocation call, String tag, String clientAddress,
         long startTime, int processingTime, int qTime, long responseSize)
       throws IOException {
       Object params[] = call.getParameters();
@@ -684,6 +691,7 @@ public class HBaseRPC {
       responseInfo.put("processingtimems", processingTime);
       responseInfo.put("queuetimems", qTime);
       responseInfo.put("responsesize", responseSize);
+      responseInfo.put("client", clientAddress);
       responseInfo.put("class", instance.getClass().getSimpleName());
       responseInfo.put("method", call.getMethodName());
       if (params.length == 2 && instance instanceof HRegionServer &&

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Oct 11 17:44:47 2011
@@ -24,6 +24,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
@@ -758,6 +760,10 @@ public abstract class HBaseServer {
       return hostAddress;
     }
 
+    public int getRemotePort() {
+      return remotePort;
+    }
+
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
     }
@@ -892,18 +898,27 @@ public abstract class HBaseServer {
   /** Handles queued calls . */
   private class Handler extends Thread {
     static final int BUFFER_INITIAL_SIZE = 1024;
+    private MonitoredRPCHandler status;
+
     public Handler(int instanceNumber) {
       this.setDaemon(true);
-      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+      String name = "IPC Server handler "+ instanceNumber + " on " + port;
+      this.setName(name);
+      this.status = TaskMonitor.get().createRPCStatus(name);
     }
 
     @Override
     public void run() {
       LOG.info(getName() + ": starting");
+      status.setStatus("starting");
       SERVER.set(HBaseServer.this);
       while (running) {
         try {
+          status.pause("Waiting for a call");
           Call call = callQueue.take(); // pop the queue; maybe blocked here
+          status.setStatus("Setting up call");
+          status.setConnection(call.connection.getHostAddress(),
+              call.connection.getRemotePort());
 
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -916,7 +931,8 @@ public abstract class HBaseServer {
           UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
           UserGroupInformation.setCurrentUser(call.connection.ticket);
           try {
-            value = call(call.param, call.timestamp);             // make the call
+            // make the call
+            value = call(call.param, call.timestamp, status);
           } catch (Throwable e) {
             LOG.info(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
@@ -1089,11 +1105,12 @@ public abstract class HBaseServer {
   /** Called for each call.
    * @param param writable parameter
    * @param receiveTime time
+   * @param status The task monitor for the associated handler.
    * @return Writable
    * @throws IOException e
    */
-  public abstract Writable call(Writable param, long receiveTime)
-                                                throws IOException;
+  public abstract Writable call(Writable param, long receiveTime,
+      MonitoredRPCHandler status) throws IOException;
 
   /**
    * The number of open RPC conections

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1181968&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Tue Oct 11 17:44:47 2011
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A MonitoredTask implementation optimized for use with RPC Handlers
+ * handling frequent, short duration tasks. String concatenations and object
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public interface MonitoredRPCHandler extends MonitoredTask {
+  public abstract String getRPC();
+  public abstract String getRPC(boolean withParams);
+  public abstract long getRPCPacketLength();
+  public abstract String getClient();
+  public abstract long getRPCStartTime();
+  public abstract long getRPCQueueTime();
+  public abstract boolean isRPCRunning();
+  public abstract boolean isOperationRunning();
+
+  public abstract void setRPC(String methodName, Object [] params,
+      long queueTime);
+  public abstract void setRPCPacket(Writable param);
+  public abstract void setConnection(String clientAddress, int remotePort);
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1181968&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Tue Oct 11 17:44:47 2011
@@ -0,0 +1,257 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A MonitoredTask implementation designed for use with RPC Handlers
+ * handling frequent, short duration tasks. String concatenations and object
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
+  implements MonitoredRPCHandler {
+  private String clientAddress;
+  private int remotePort;
+  private long rpcQueueTime;
+  private long rpcStartTime;
+  private String methodName = "";
+  private Object [] params = {};
+  private Writable packet;
+
+  public MonitoredRPCHandlerImpl() {
+    super();
+    // in this implementation, WAITING indicates that the handler is not
+    // actively servicing an RPC call.
+    setState(State.WAITING);
+  }
+
+  @Override
+  public synchronized MonitoredRPCHandlerImpl clone() {
+    return (MonitoredRPCHandlerImpl) super.clone();
+  }
+
+  /**
+   * Gets the status of this handler; if it is currently servicing an RPC,
+   * this status will include the RPC information.
+   * @return a String describing the current status.
+   */
+  @Override
+  public String getStatus() {
+    if (getState() != State.RUNNING) {
+      return super.getStatus();
+    }
+    return super.getStatus() + " from " + getClient() + ": " + getRPC();
+  }
+
+  /**
+   * Accesses the queue time for the currently running RPC on the
+   * monitored Handler.
+   * @return the queue timestamp or -1 if there is no RPC currently running.
+   */
+  public long getRPCQueueTime() {
+    if (getState() != State.RUNNING) {
+      return -1;
+    }
+    return rpcQueueTime;
+  }
+
+  /**
+   * Accesses the start time for the currently running RPC on the
+   * monitored Handler.
+   * @return the start timestamp or -1 if there is no RPC currently running.
+   */
+  public long getRPCStartTime() {
+    if (getState() != State.RUNNING) {
+      return -1;
+    }
+    return rpcStartTime;
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @return a string representing the method call without parameters
+   */
+  public String getRPC() {
+    return getRPC(false);
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @param withParams toggle inclusion of parameters in the RPC String
+   * @return A human-readable string representation of the method call.
+   */
+  public synchronized String getRPC(boolean withParams) {
+    if (getState() != State.RUNNING) {
+      // no RPC is currently running
+      return "";
+    }
+    StringBuilder buffer = new StringBuilder(256);
+    buffer.append(methodName);
+    if (withParams) {
+      buffer.append("(");
+      for (int i = 0; i < params.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(params[i]);
+      }
+      buffer.append(")");
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Produces a string representation of the method currently being serviced
+   * by this Handler.
+   * @return A human-readable string representation of the method call.
+   */
+  public long getRPCPacketLength() {
+    if (getState() != State.RUNNING || packet == null) {
+      // no RPC is currently running, or we don't have an RPC's packet info
+      return -1L;
+    }
+    if (!(packet instanceof WritableWithSize)) {
+      // the packet passed to us doesn't expose size information
+      return -1L;
+    }
+    return ((WritableWithSize) packet).getWritableSize();
+  }
+
+  /**
+   * If an RPC call is currently running, produces a String representation of
+   * the connection from which it was received.
+   * @return A human-readable string representation of the address and port
+   *  of the client.
+   */
+  public String getClient() {
+    return clientAddress + ":" + remotePort;
+  }
+
+  /**
+   * Indicates to the client whether this task is monitoring a currently active
+   * RPC call.
+   * @return true if the monitored handler is currently servicing an RPC call.
+   */
+  public boolean isRPCRunning() {
+    return getState() == State.RUNNING;
+  }
+
+  /**
+   * Indicates to the client whether this task is monitoring a currently active
+   * RPC call to a database command. (as defined by
+   * o.a.h.h.client.Operation)
+   * @return true if the monitored handler is currently servicing an RPC call
+   * to a database command.
+   */
+  public boolean isOperationRunning() {
+    if(!isRPCRunning()) {
+      return false;
+    }
+    for(Object param : params) {
+      if (param instanceof Operation) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Tells this instance that it is monitoring a new RPC call.
+   * @param methodName The name of the method that will be called by the RPC.
+   * @param params The parameters that will be passed to the indicated method.
+   */
+  public synchronized void setRPC(String methodName, Object [] params,
+      long queueTime) {
+    this.methodName = methodName;
+    this.params = params;
+    this.rpcStartTime = System.currentTimeMillis();
+    this.rpcQueueTime = queueTime;
+    this.state = State.RUNNING;
+  }
+
+  /**
+   * Gives this instance a reference to the Writable received by the RPC, so
+   * that it can later compute its size if asked for it.
+   * @param param The Writable received by the RPC for this call
+   */
+  public void setRPCPacket(Writable param) {
+    this.packet = param;
+  }
+
+  /**
+   * Registers current handler client details.
+   * @param clientAddress the address of the current client
+   * @param remotePort the port from which the client connected
+   */
+  public synchronized void setConnection(String clientAddress,
+      int remotePort) {
+    this.clientAddress = clientAddress;
+    this.remotePort = remotePort;
+  }
+
+  public synchronized Map<String, Object> toMap() {
+    // only include RPC info if the Handler is actively servicing an RPC call
+    Map<String, Object> map = super.toMap();
+    if (getState() != State.RUNNING) {
+      return map;
+    }
+    Map<String, Object> rpcJSON = new HashMap<String, Object>();
+    ArrayList paramList = new ArrayList();
+    map.put("rpcCall", rpcJSON);
+    rpcJSON.put("queuetimems", getRPCQueueTime());
+    rpcJSON.put("starttimems", getRPCStartTime());
+    rpcJSON.put("clientaddress", clientAddress);
+    rpcJSON.put("remoteport", remotePort);
+    rpcJSON.put("packetlength", getRPCPacketLength());
+    rpcJSON.put("method", methodName);
+    rpcJSON.put("params", paramList);
+    for(Object param : params) {
+      if(param instanceof byte []) {
+        paramList.add(Bytes.toStringBinary((byte []) param));
+      } else if (param instanceof Operation) {
+        paramList.add(((Operation) param).toMap());
+      } else {
+        paramList.add(param.toString());
+      }
+    }
+    return map;
+  }
+
+  @Override
+  public String toString() {
+    if (getState() != State.RUNNING) {
+      return super.toString();
+    }
+    return super.toString() + ", rpcMethod=" + getRPC();
+  }
+
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Tue Oct 11 17:44:47 2011
@@ -19,28 +19,32 @@
  */
 package org.apache.hadoop.hbase.monitoring;
 
-public interface MonitoredTask {
+import java.io.IOException;
+import java.util.Map;
+
+public interface MonitoredTask extends Cloneable {
   enum State {
     RUNNING,
+    WAITING,
     COMPLETE,
     ABORTED;
   }
 
   public abstract long getStartTime();
-
   public abstract String getDescription();
-
   public abstract String getStatus();
-
+  public abstract long getStatusTime();
   public abstract State getState();
-
+  public abstract long getStateTime();
   public abstract long getCompletionTimestamp();
 
   public abstract void markComplete(String msg);
+  public abstract void pause(String msg);
+  public abstract void resume(String msg);
   public abstract void abort(String msg);
+  public void expireNow();
 
   public abstract void setStatus(String status);
-
   public abstract void setDescription(String description);
 
   /**
@@ -49,5 +53,24 @@ public interface MonitoredTask {
    */
   public abstract void cleanup();
 
+  /**
+   * Public exposure of Object.clone() in order to allow clients to easily
+   * capture current state.
+   * @returns a copy of the object whose references will not change
+   */
+  public abstract MonitoredTask clone();
+
+  /**
+   * Creates a string map of internal details for extensible exposure of
+   * monitored tasks.
+   * @return A Map containing information for this task.
+   */
+  public abstract Map<String, Object> toMap() throws IOException;
+
+  /**
+   * Creates a JSON object for parseable exposure of monitored tasks.
+   * @return An encoded JSON object containing information for this task.
+   */
+  public abstract String toJSON() throws IOException;
 
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Tue Oct 11 17:44:47 2011
@@ -19,19 +19,36 @@
  */
 package org.apache.hadoop.hbase.monitoring;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 class MonitoredTaskImpl implements MonitoredTask {
   private long startTime;
-  private long completionTimestamp = -1;
+  private long statusTime;
+  private long stateTime;
 
-  private String status;
-  private String description;
+  private volatile String status;
+  private volatile String description;
 
-  private State state = State.RUNNING;
+  protected volatile State state = State.RUNNING;
 
   public MonitoredTaskImpl() {
     startTime = System.currentTimeMillis();
+    statusTime = startTime;
+    stateTime = startTime;
+  }
+
+  @Override
+  public synchronized MonitoredTaskImpl clone() {
+    try {
+      return (MonitoredTaskImpl) super.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new AssertionError(); // Won't happen
+    }
   }
 
   @Override
@@ -50,32 +67,61 @@ class MonitoredTaskImpl implements Monit
   }
 
   @Override
+  public long getStatusTime() {
+    return statusTime;
+  }
+
+  @Override
   public State getState() {
     return state;
   }
 
   @Override
+  public long getStateTime() {
+    return stateTime;
+  }
+
+  @Override
   public long getCompletionTimestamp() {
-    return completionTimestamp;
+    if (state == State.COMPLETE || state == State.ABORTED) {
+      return stateTime;
+    }
+    return -1;
   }
 
   @Override
   public void markComplete(String status) {
-    state = State.COMPLETE;
+    setState(State.COMPLETE);
     setStatus(status);
-    completionTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void pause(String msg) {
+    setState(State.WAITING);
+    setStatus(msg);
+  }
+
+  @Override
+  public void resume(String msg) {
+    setState(State.RUNNING);
+    setStatus(msg);
   }
 
   @Override
   public void abort(String msg) {
     setStatus(msg);
-    state = State.ABORTED;
-    completionTimestamp = System.currentTimeMillis();
+    setState(State.ABORTED);
   }
 
   @Override
   public void setStatus(String status) {
     this.status = status;
+    statusTime = System.currentTimeMillis();
+  }
+
+  protected void setState(State state) {
+    this.state = state;
+    stateTime = System.currentTimeMillis();
   }
 
   @Override
@@ -86,8 +132,7 @@ class MonitoredTaskImpl implements Monit
   @Override
   public void cleanup() {
     if (state == State.RUNNING) {
-      state = State.ABORTED;
-      completionTimestamp = System.currentTimeMillis();
+      setState(State.ABORTED);
     }
   }
 
@@ -95,8 +140,41 @@ class MonitoredTaskImpl implements Monit
    * Force the completion timestamp backwards so that
    * it expires now.
    */
-  @VisibleForTesting
-  void expireNow() {
-    completionTimestamp -= 180 * 1000;
+  public void expireNow() {
+    stateTime -= 180 * 1000;
+  }
+
+  @Override
+  public Map<String, Object> toMap() {
+    Map<String, Object> map = new HashMap<String, Object>();
+    map.put("description", getDescription());
+    map.put("status", getStatus());
+    map.put("state", getState());
+    map.put("starttimems", getStartTime());
+    map.put("statustimems", getCompletionTimestamp());
+    map.put("statetimems", getCompletionTimestamp());
+    return map;
+  }
+
+  @Override
+  public String toJSON() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    return mapper.writeValueAsString(toMap());
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(512);
+    sb.append(getDescription());
+    sb.append(": status=");
+    sb.append(getStatus());
+    sb.append(", state=");
+    sb.append(getState());
+    sb.append(", startTime=");
+    sb.append(getStartTime());
+    sb.append(", completionTime=");
+    sb.append(getCompletionTimestamp());
+    return sb.toString();
+  }
+
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Oct 11 17:44:47 2011
@@ -70,7 +70,18 @@ public class TaskMonitor {
         stat.getClass().getClassLoader(),
         new Class<?>[] { MonitoredTask.class },
         new PassthroughInvocationHandler<MonitoredTask>(stat));
+    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+    tasks.add(pair);
+    return proxy;
+  }
 
+  public MonitoredRPCHandler createRPCStatus(String description) {
+    MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
+    stat.setDescription(description);
+    MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
+        stat.getClass().getClassLoader(),
+        new Class<?>[] { MonitoredRPCHandler.class },
+        new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
     TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
     tasks.add(pair);
     return proxy;
@@ -107,11 +118,17 @@ public class TaskMonitor {
     }
   }
 
+  /**
+   * Produces a list containing copies of the current state of all non-expired
+   * MonitoredTasks handled by this TaskMonitor.
+   * @return A complete list of MonitoredTasks.
+   */
   public synchronized List<MonitoredTask> getTasks() {
     purgeExpiredTasks();
     ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
     for (TaskAndWeakRefPair pair : tasks) {
-      ret.add(pair.get());
+      MonitoredTask t = pair.get();
+      ret.add(t.clone());
     }
     return ret;
   }
@@ -159,7 +176,8 @@ public class TaskMonitor {
   }
 
   /**
-   * An InvocationHandler that simply passes through calls to the original object.
+   * An InvocationHandler that simply passes through calls to the original
+   * object.
    */
   private static class PassthroughInvocationHandler<T> implements InvocationHandler {
     private T delegatee;

Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css Tue Oct 11 17:44:47 2011
@@ -18,6 +18,11 @@ tr.task-monitor-COMPLETE td {
   background-color: #afa;
 }
 
+tr.task-monitor-WAITING td {
+  background-color: #ccc;
+  font-style: italic;
+}
+
 tr.task-monitor-IDLE td {
   background-color: #ccc;
   font-style: italic;

Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp Tue Oct 11 17:44:47 2011
@@ -1,16 +1,53 @@
 <%@ page contentType="text/html;charset=UTF-8"
   import="java.util.*"
-  import="org.codehaus.jettison.json.JSONArray"
-  import="org.codehaus.jettison.json.JSONException"
-  import="org.codehaus.jettison.json.JSONObject"
   import="org.apache.hadoop.util.StringUtils"
-  import="org.apache.hadoop.hbase.ipc.HBaseRPC"
+  import="org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler"
   import="org.apache.hadoop.hbase.monitoring.MonitoredTask"
   import="org.apache.hadoop.hbase.monitoring.TaskMonitor" %><%
+  // extract requested filter
+  String filter = "general";
+  if (request.getParameter("filter") != null) {
+    filter = request.getParameter("filter");
+  }
   TaskMonitor taskMonitor = TaskMonitor.get();
-  List<MonitoredTask> tasks = taskMonitor.getTasks();
+  List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
+  Iterator<? extends MonitoredTask> iter = tasks.iterator();
+  // apply requested filter
+  while (iter.hasNext()) {
+    MonitoredTask t = iter.next();
+    if (filter.equals("general")) {
+      if (t instanceof MonitoredRPCHandler)
+        iter.remove();
+    } else if (filter.equals("handler")) {
+      if (!(t instanceof MonitoredRPCHandler))
+        iter.remove();
+    } else if (filter.equals("rpc")) {
+      if (!(t instanceof MonitoredRPCHandler) ||
+          !((MonitoredRPCHandler) t).isRPCRunning())
+        iter.remove();
+    } else if (filter.equals("operation")) {
+      if (!(t instanceof MonitoredRPCHandler) ||
+          !((MonitoredRPCHandler) t).isOperationRunning())
+        iter.remove();
+    }
+  }
   long now = System.currentTimeMillis();
   Collections.reverse(tasks);
+  // output to JSON if requested
+  if(request.getParameter("format") != null &&
+      request.getParameter("format").equals("json")) {
+      %><%= "[" %><%
+      boolean first = true;
+      for(MonitoredTask task : tasks) {
+        if (first) {
+          first = false;
+        } else {
+          %><%= "," %><%
+        }
+        %><%= task.toJSON() %><%
+      }
+      %><%= "]" %><%
+  } else {
 %><?xml version="1.0" encoding="UTF-8" ?>
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
   "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
@@ -23,6 +60,14 @@
   <a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase">
     <img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" />
   </a>
+  <div style="float:right;">
+    <a href="taskmonitor.jsp?filter=all">Show All Monitored Tasks</a> |
+    <a href="taskmonitor.jsp?filter=general">Show non-RPC Tasks</a> |
+    <a href="taskmonitor.jsp?filter=handler">Show All RPC Handler Tasks</a> |
+    <a href="taskmonitor.jsp?filter=rpc">Show Active RPC Calls</a> |
+    <a href="taskmonitor.jsp?filter=operation">Show Client Operations</a> |
+    <a href="taskmonitor.jsp?format=json&filter=<%= filter %>">View as JSON</a>
+  </div>
   <h1 id="page_title">Task Monitor</h1>
   <h2>Recent tasks</h2>
   <% if(tasks.isEmpty()) { %>
@@ -35,6 +80,9 @@
       <tr class="task-monitor-COMPLETE">
         <td>COMPLETE</td>
       </tr>
+      <tr class="task-monitor-WAITING">
+        <td>WAITING</td>
+      </tr>
       <tr class="task-monitor-ABORTED">
         <td>ABORTED</td>
       </tr>
@@ -43,22 +91,22 @@
     key.</p>
     <table style="clear:right">
       <tr>
+        <th>Start Time</th>
         <th>Description</th>
+        <th>State</th>
         <th>Status</th>
-        <th>Age</th>
       </tr>
       <% for(MonitoredTask task : tasks) { %>
         <tr class="task-monitor-<%= task.getState() %>">
+          <td><%= new Date(task.getStartTime()) %></td>
           <td><%= task.getDescription() %></td>
-          <td><%= task.getStatus() %></td>
-          <td><%= StringUtils.formatTimeDiff(now, task.getStartTime()) %>
-            <% if (task.getState() == MonitoredTask.State.COMPLETE) { %>
-              (Completed <%= StringUtils.formatTimeDiff(now,
-                              task.getCompletionTimestamp()) %> ago)
-            <% } else if (task.getState() == MonitoredTask.State.ABORTED) { %>
-              (Aborted <%= StringUtils.formatTimeDiff(now,
-                            task.getCompletionTimestamp()) %> ago)
-            <% } %>
+          <td><%= task.getState() %>
+              (since <%= StringUtils.formatTimeDiff(now,
+                              task.getStateTime()) %> ago)
+          </td>
+          <td><%= task.getStatus() %>
+              (since <%= StringUtils.formatTimeDiff(now,
+                              task.getStatusTime()) %> ago)
           </td>
         </tr>
       <% } %>
@@ -66,3 +114,4 @@
   <% } %>
 </body>
 </html>
+<% } %>

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Tue Oct 11 17:44:47 2011
@@ -44,13 +44,13 @@ public class TestTaskMonitor {
 
     // Mark it as finished
     task.markComplete("Finished!");
-    assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+    assertEquals(MonitoredTask.State.COMPLETE, task.getState());
 
     // It should still show up in the TaskMonitor list
     assertEquals(1, tm.getTasks().size());
 
     // If we mark its completion time back a few minutes, it should get gced
-    ((MonitoredTaskImpl)taskFromTm).expireNow();
+    task.expireNow();
     assertEquals(0, tm.getTasks().size());
   }