You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:44:48 UTC
svn commit: r1181968 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/monitoring/
main/resources/hbase-webapps/static/
main/resources/hbase-webapps/taskmonitor/
test/java/org/apache/hadoop/hbase/mo...
Author: nspiegelberg
Date: Tue Oct 11 17:44:47 2011
New Revision: 1181968
URL: http://svn.apache.org/viewvc?rev=1181968&view=rev
Log:
Build Show Processlist for HBase
Summary:
This is an improved version of D291214, but diffed against trunk now that the
open source port has been committed.
Each RPC handler claims a single MonitoredRPCHandler, keeping it updated with
whether it's in a waiting state or actively servicing an RPC, and if so, serves
details about the RPC method being called.
Test Plan: Ran all unit tests, have been doing a lot of load testing on
Nicolas's dev cluster, about to start dark launch tests.
Reviewers: nspiegelberg
Reviewed By: nspiegelberg
CC: hbase@lists, nspiegelberg, riley, bhiller
Differential Revision: 300129
Task ID: 620988
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Oct 11 17:44:47 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -592,7 +593,8 @@ public class HBaseRPC {
}
@Override
- public Writable call(Writable param, long receivedTime) throws IOException {
+ public Writable call(Writable param, long receivedTime,
+ MonitoredRPCHandler status) throws IOException {
try {
Invocation call = (Invocation)param;
if(call.getMethodName() == null) {
@@ -600,6 +602,9 @@ public class HBaseRPC {
"cause is a version mismatch between client and server.");
}
if (verbose) log("Call: " + call);
+ status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
+ status.setRPCPacket(param);
+ status.resume("Servicing call");
Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
@@ -631,7 +636,8 @@ public class HBaseRPC {
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
logResponse(call, (tooLarge ? "TooLarge" : "TooSlow"),
- startTime, processingTime, qTime, responseSize);
+ status.getClient(), startTime, processingTime, qTime,
+ responseSize);
if (tooSlow) {
// increment global slow RPC response counter
rpcMetrics.inc("slowResponse.", processingTime);
@@ -666,13 +672,14 @@ public class HBaseRPC {
* client Operations.
* @param call The call to log.
* @param tag The tag that will be used to indicate this event in the log.
+ * @param client The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
*/
- private void logResponse(Invocation call, String tag,
+ private void logResponse(Invocation call, String tag, String clientAddress,
long startTime, int processingTime, int qTime, long responseSize)
throws IOException {
Object params[] = call.getParameters();
@@ -684,6 +691,7 @@ public class HBaseRPC {
responseInfo.put("processingtimems", processingTime);
responseInfo.put("queuetimems", qTime);
responseInfo.put("responsesize", responseSize);
+ responseInfo.put("client", clientAddress);
responseInfo.put("class", instance.getClass().getSimpleName());
responseInfo.put("method", call.getMethodName());
if (params.length == 2 && instance instanceof HRegionServer &&
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Oct 11 17:44:47 2011
@@ -24,6 +24,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
@@ -758,6 +760,10 @@ public abstract class HBaseServer {
return hostAddress;
}
+ public int getRemotePort() {
+ return remotePort;
+ }
+
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
@@ -892,18 +898,27 @@ public abstract class HBaseServer {
/** Handles queued calls . */
private class Handler extends Thread {
static final int BUFFER_INITIAL_SIZE = 1024;
+ private MonitoredRPCHandler status;
+
public Handler(int instanceNumber) {
this.setDaemon(true);
- this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+ String name = "IPC Server handler "+ instanceNumber + " on " + port;
+ this.setName(name);
+ this.status = TaskMonitor.get().createRPCStatus(name);
}
@Override
public void run() {
LOG.info(getName() + ": starting");
+ status.setStatus("starting");
SERVER.set(HBaseServer.this);
while (running) {
try {
+ status.pause("Waiting for a call");
Call call = callQueue.take(); // pop the queue; maybe blocked here
+ status.setStatus("Setting up call");
+ status.setConnection(call.connection.getHostAddress(),
+ call.connection.getRemotePort());
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -916,7 +931,8 @@ public abstract class HBaseServer {
UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
UserGroupInformation.setCurrentUser(call.connection.ticket);
try {
- value = call(call.param, call.timestamp); // make the call
+ // make the call
+ value = call(call.param, call.timestamp, status);
} catch (Throwable e) {
LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
@@ -1089,11 +1105,12 @@ public abstract class HBaseServer {
/** Called for each call.
* @param param writable parameter
* @param receiveTime time
+ * @param status The task monitor for the associated handler.
* @return Writable
* @throws IOException e
*/
- public abstract Writable call(Writable param, long receiveTime)
- throws IOException;
+ public abstract Writable call(Writable param, long receiveTime,
+ MonitoredRPCHandler status) throws IOException;
/**
* The number of open RPC conections
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1181968&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Tue Oct 11 17:44:47 2011
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A MonitoredTask implementation optimized for use with RPC Handlers
+ * handling frequent, short duration tasks. String concatenations and object
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public interface MonitoredRPCHandler extends MonitoredTask {
+ public abstract String getRPC();
+ public abstract String getRPC(boolean withParams);
+ public abstract long getRPCPacketLength();
+ public abstract String getClient();
+ public abstract long getRPCStartTime();
+ public abstract long getRPCQueueTime();
+ public abstract boolean isRPCRunning();
+ public abstract boolean isOperationRunning();
+
+ public abstract void setRPC(String methodName, Object [] params,
+ long queueTime);
+ public abstract void setRPCPacket(Writable param);
+ public abstract void setConnection(String clientAddress, int remotePort);
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1181968&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Tue Oct 11 17:44:47 2011
@@ -0,0 +1,257 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A MonitoredTask implementation designed for use with RPC Handlers
+ * handling frequent, short duration tasks. String concatenations and object
+ * allocations are avoided in methods that will be hit by every RPC call.
+ */
+public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
+ implements MonitoredRPCHandler {
+ private String clientAddress;
+ private int remotePort;
+ private long rpcQueueTime;
+ private long rpcStartTime;
+ private String methodName = "";
+ private Object [] params = {};
+ private Writable packet;
+
+ public MonitoredRPCHandlerImpl() {
+ super();
+ // in this implementation, WAITING indicates that the handler is not
+ // actively servicing an RPC call.
+ setState(State.WAITING);
+ }
+
+ @Override
+ public synchronized MonitoredRPCHandlerImpl clone() {
+ return (MonitoredRPCHandlerImpl) super.clone();
+ }
+
+ /**
+ * Gets the status of this handler; if it is currently servicing an RPC,
+ * this status will include the RPC information.
+ * @return a String describing the current status.
+ */
+ @Override
+ public String getStatus() {
+ if (getState() != State.RUNNING) {
+ return super.getStatus();
+ }
+ return super.getStatus() + " from " + getClient() + ": " + getRPC();
+ }
+
+ /**
+ * Accesses the queue time for the currently running RPC on the
+ * monitored Handler.
+ * @return the queue timestamp or -1 if there is no RPC currently running.
+ */
+ public long getRPCQueueTime() {
+ if (getState() != State.RUNNING) {
+ return -1;
+ }
+ return rpcQueueTime;
+ }
+
+ /**
+ * Accesses the start time for the currently running RPC on the
+ * monitored Handler.
+ * @return the start timestamp or -1 if there is no RPC currently running.
+ */
+ public long getRPCStartTime() {
+ if (getState() != State.RUNNING) {
+ return -1;
+ }
+ return rpcStartTime;
+ }
+
+ /**
+ * Produces a string representation of the method currently being serviced
+ * by this Handler.
+ * @return a string representing the method call without parameters
+ */
+ public String getRPC() {
+ return getRPC(false);
+ }
+
+ /**
+ * Produces a string representation of the method currently being serviced
+ * by this Handler.
+ * @param withParams toggle inclusion of parameters in the RPC String
+ * @return A human-readable string representation of the method call.
+ */
+ public synchronized String getRPC(boolean withParams) {
+ if (getState() != State.RUNNING) {
+ // no RPC is currently running
+ return "";
+ }
+ StringBuilder buffer = new StringBuilder(256);
+ buffer.append(methodName);
+ if (withParams) {
+ buffer.append("(");
+ for (int i = 0; i < params.length; i++) {
+ if (i != 0)
+ buffer.append(", ");
+ buffer.append(params[i]);
+ }
+ buffer.append(")");
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Produces a string representation of the method currently being serviced
+ * by this Handler.
+ * @return A human-readable string representation of the method call.
+ */
+ public long getRPCPacketLength() {
+ if (getState() != State.RUNNING || packet == null) {
+ // no RPC is currently running, or we don't have an RPC's packet info
+ return -1L;
+ }
+ if (!(packet instanceof WritableWithSize)) {
+ // the packet passed to us doesn't expose size information
+ return -1L;
+ }
+ return ((WritableWithSize) packet).getWritableSize();
+ }
+
+ /**
+ * If an RPC call is currently running, produces a String representation of
+ * the connection from which it was received.
+ * @return A human-readable string representation of the address and port
+ * of the client.
+ */
+ public String getClient() {
+ return clientAddress + ":" + remotePort;
+ }
+
+ /**
+ * Indicates to the client whether this task is monitoring a currently active
+ * RPC call.
+ * @return true if the monitored handler is currently servicing an RPC call.
+ */
+ public boolean isRPCRunning() {
+ return getState() == State.RUNNING;
+ }
+
+ /**
+ * Indicates to the client whether this task is monitoring a currently active
+ * RPC call to a database command. (as defined by
+ * o.a.h.h.client.Operation)
+ * @return true if the monitored handler is currently servicing an RPC call
+ * to a database command.
+ */
+ public boolean isOperationRunning() {
+ if(!isRPCRunning()) {
+ return false;
+ }
+ for(Object param : params) {
+ if (param instanceof Operation) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Tells this instance that it is monitoring a new RPC call.
+ * @param methodName The name of the method that will be called by the RPC.
+ * @param params The parameters that will be passed to the indicated method.
+ */
+ public synchronized void setRPC(String methodName, Object [] params,
+ long queueTime) {
+ this.methodName = methodName;
+ this.params = params;
+ this.rpcStartTime = System.currentTimeMillis();
+ this.rpcQueueTime = queueTime;
+ this.state = State.RUNNING;
+ }
+
+ /**
+ * Gives this instance a reference to the Writable received by the RPC, so
+ * that it can later compute its size if asked for it.
+ * @param param The Writable received by the RPC for this call
+ */
+ public void setRPCPacket(Writable param) {
+ this.packet = param;
+ }
+
+ /**
+ * Registers current handler client details.
+ * @param clientAddress the address of the current client
+ * @param remotePort the port from which the client connected
+ */
+ public synchronized void setConnection(String clientAddress,
+ int remotePort) {
+ this.clientAddress = clientAddress;
+ this.remotePort = remotePort;
+ }
+
+ public synchronized Map<String, Object> toMap() {
+ // only include RPC info if the Handler is actively servicing an RPC call
+ Map<String, Object> map = super.toMap();
+ if (getState() != State.RUNNING) {
+ return map;
+ }
+ Map<String, Object> rpcJSON = new HashMap<String, Object>();
+ ArrayList paramList = new ArrayList();
+ map.put("rpcCall", rpcJSON);
+ rpcJSON.put("queuetimems", getRPCQueueTime());
+ rpcJSON.put("starttimems", getRPCStartTime());
+ rpcJSON.put("clientaddress", clientAddress);
+ rpcJSON.put("remoteport", remotePort);
+ rpcJSON.put("packetlength", getRPCPacketLength());
+ rpcJSON.put("method", methodName);
+ rpcJSON.put("params", paramList);
+ for(Object param : params) {
+ if(param instanceof byte []) {
+ paramList.add(Bytes.toStringBinary((byte []) param));
+ } else if (param instanceof Operation) {
+ paramList.add(((Operation) param).toMap());
+ } else {
+ paramList.add(param.toString());
+ }
+ }
+ return map;
+ }
+
+ @Override
+ public String toString() {
+ if (getState() != State.RUNNING) {
+ return super.toString();
+ }
+ return super.toString() + ", rpcMethod=" + getRPC();
+ }
+
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Tue Oct 11 17:44:47 2011
@@ -19,28 +19,32 @@
*/
package org.apache.hadoop.hbase.monitoring;
-public interface MonitoredTask {
+import java.io.IOException;
+import java.util.Map;
+
+public interface MonitoredTask extends Cloneable {
enum State {
RUNNING,
+ WAITING,
COMPLETE,
ABORTED;
}
public abstract long getStartTime();
-
public abstract String getDescription();
-
public abstract String getStatus();
-
+ public abstract long getStatusTime();
public abstract State getState();
-
+ public abstract long getStateTime();
public abstract long getCompletionTimestamp();
public abstract void markComplete(String msg);
+ public abstract void pause(String msg);
+ public abstract void resume(String msg);
public abstract void abort(String msg);
+ public void expireNow();
public abstract void setStatus(String status);
-
public abstract void setDescription(String description);
/**
@@ -49,5 +53,24 @@ public interface MonitoredTask {
*/
public abstract void cleanup();
+ /**
+ * Public exposure of Object.clone() in order to allow clients to easily
+ * capture current state.
+ * @returns a copy of the object whose references will not change
+ */
+ public abstract MonitoredTask clone();
+
+ /**
+ * Creates a string map of internal details for extensible exposure of
+ * monitored tasks.
+ * @return A Map containing information for this task.
+ */
+ public abstract Map<String, Object> toMap() throws IOException;
+
+ /**
+ * Creates a JSON object for parseable exposure of monitored tasks.
+ * @return An encoded JSON object containing information for this task.
+ */
+ public abstract String toJSON() throws IOException;
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Tue Oct 11 17:44:47 2011
@@ -19,19 +19,36 @@
*/
package org.apache.hadoop.hbase.monitoring;
-import com.google.common.annotations.VisibleForTesting;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
class MonitoredTaskImpl implements MonitoredTask {
private long startTime;
- private long completionTimestamp = -1;
+ private long statusTime;
+ private long stateTime;
- private String status;
- private String description;
+ private volatile String status;
+ private volatile String description;
- private State state = State.RUNNING;
+ protected volatile State state = State.RUNNING;
public MonitoredTaskImpl() {
startTime = System.currentTimeMillis();
+ statusTime = startTime;
+ stateTime = startTime;
+ }
+
+ @Override
+ public synchronized MonitoredTaskImpl clone() {
+ try {
+ return (MonitoredTaskImpl) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new AssertionError(); // Won't happen
+ }
}
@Override
@@ -50,32 +67,61 @@ class MonitoredTaskImpl implements Monit
}
@Override
+ public long getStatusTime() {
+ return statusTime;
+ }
+
+ @Override
public State getState() {
return state;
}
@Override
+ public long getStateTime() {
+ return stateTime;
+ }
+
+ @Override
public long getCompletionTimestamp() {
- return completionTimestamp;
+ if (state == State.COMPLETE || state == State.ABORTED) {
+ return stateTime;
+ }
+ return -1;
}
@Override
public void markComplete(String status) {
- state = State.COMPLETE;
+ setState(State.COMPLETE);
setStatus(status);
- completionTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void pause(String msg) {
+ setState(State.WAITING);
+ setStatus(msg);
+ }
+
+ @Override
+ public void resume(String msg) {
+ setState(State.RUNNING);
+ setStatus(msg);
}
@Override
public void abort(String msg) {
setStatus(msg);
- state = State.ABORTED;
- completionTimestamp = System.currentTimeMillis();
+ setState(State.ABORTED);
}
@Override
public void setStatus(String status) {
this.status = status;
+ statusTime = System.currentTimeMillis();
+ }
+
+ protected void setState(State state) {
+ this.state = state;
+ stateTime = System.currentTimeMillis();
}
@Override
@@ -86,8 +132,7 @@ class MonitoredTaskImpl implements Monit
@Override
public void cleanup() {
if (state == State.RUNNING) {
- state = State.ABORTED;
- completionTimestamp = System.currentTimeMillis();
+ setState(State.ABORTED);
}
}
@@ -95,8 +140,41 @@ class MonitoredTaskImpl implements Monit
* Force the completion timestamp backwards so that
* it expires now.
*/
- @VisibleForTesting
- void expireNow() {
- completionTimestamp -= 180 * 1000;
+ public void expireNow() {
+ stateTime -= 180 * 1000;
+ }
+
+ @Override
+ public Map<String, Object> toMap() {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("description", getDescription());
+ map.put("status", getStatus());
+ map.put("state", getState());
+ map.put("starttimems", getStartTime());
+ map.put("statustimems", getCompletionTimestamp());
+ map.put("statetimems", getCompletionTimestamp());
+ return map;
+ }
+
+ @Override
+ public String toJSON() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(toMap());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(512);
+ sb.append(getDescription());
+ sb.append(": status=");
+ sb.append(getStatus());
+ sb.append(", state=");
+ sb.append(getState());
+ sb.append(", startTime=");
+ sb.append(getStartTime());
+ sb.append(", completionTime=");
+ sb.append(getCompletionTimestamp());
+ return sb.toString();
+ }
+
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Oct 11 17:44:47 2011
@@ -70,7 +70,18 @@ public class TaskMonitor {
stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class },
new PassthroughInvocationHandler<MonitoredTask>(stat));
+ TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+ tasks.add(pair);
+ return proxy;
+ }
+ public MonitoredRPCHandler createRPCStatus(String description) {
+ MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
+ stat.setDescription(description);
+ MonitoredRPCHandler proxy = (MonitoredRPCHandler) Proxy.newProxyInstance(
+ stat.getClass().getClassLoader(),
+ new Class<?>[] { MonitoredRPCHandler.class },
+ new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair);
return proxy;
@@ -107,11 +118,17 @@ public class TaskMonitor {
}
}
+ /**
+ * Produces a list containing copies of the current state of all non-expired
+ * MonitoredTasks handled by this TaskMonitor.
+ * @return A complete list of MonitoredTasks.
+ */
public synchronized List<MonitoredTask> getTasks() {
purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
for (TaskAndWeakRefPair pair : tasks) {
- ret.add(pair.get());
+ MonitoredTask t = pair.get();
+ ret.add(t.clone());
}
return ret;
}
@@ -159,7 +176,8 @@ public class TaskMonitor {
}
/**
- * An InvocationHandler that simply passes through calls to the original object.
+ * An InvocationHandler that simply passes through calls to the original
+ * object.
*/
private static class PassthroughInvocationHandler<T> implements InvocationHandler {
private T delegatee;
Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css Tue Oct 11 17:44:47 2011
@@ -18,6 +18,11 @@ tr.task-monitor-COMPLETE td {
background-color: #afa;
}
+tr.task-monitor-WAITING td {
+ background-color: #ccc;
+ font-style: italic;
+}
+
tr.task-monitor-IDLE td {
background-color: #ccc;
font-style: italic;
Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp Tue Oct 11 17:44:47 2011
@@ -1,16 +1,53 @@
<%@ page contentType="text/html;charset=UTF-8"
import="java.util.*"
- import="org.codehaus.jettison.json.JSONArray"
- import="org.codehaus.jettison.json.JSONException"
- import="org.codehaus.jettison.json.JSONObject"
import="org.apache.hadoop.util.StringUtils"
- import="org.apache.hadoop.hbase.ipc.HBaseRPC"
+ import="org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler"
import="org.apache.hadoop.hbase.monitoring.MonitoredTask"
import="org.apache.hadoop.hbase.monitoring.TaskMonitor" %><%
+ // extract requested filter
+ String filter = "general";
+ if (request.getParameter("filter") != null) {
+ filter = request.getParameter("filter");
+ }
TaskMonitor taskMonitor = TaskMonitor.get();
- List<MonitoredTask> tasks = taskMonitor.getTasks();
+ List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
+ Iterator<? extends MonitoredTask> iter = tasks.iterator();
+ // apply requested filter
+ while (iter.hasNext()) {
+ MonitoredTask t = iter.next();
+ if (filter.equals("general")) {
+ if (t instanceof MonitoredRPCHandler)
+ iter.remove();
+ } else if (filter.equals("handler")) {
+ if (!(t instanceof MonitoredRPCHandler))
+ iter.remove();
+ } else if (filter.equals("rpc")) {
+ if (!(t instanceof MonitoredRPCHandler) ||
+ !((MonitoredRPCHandler) t).isRPCRunning())
+ iter.remove();
+ } else if (filter.equals("operation")) {
+ if (!(t instanceof MonitoredRPCHandler) ||
+ !((MonitoredRPCHandler) t).isOperationRunning())
+ iter.remove();
+ }
+ }
long now = System.currentTimeMillis();
Collections.reverse(tasks);
+ // output to JSON if requested
+ if(request.getParameter("format") != null &&
+ request.getParameter("format").equals("json")) {
+ %><%= "[" %><%
+ boolean first = true;
+ for(MonitoredTask task : tasks) {
+ if (first) {
+ first = false;
+ } else {
+ %><%= "," %><%
+ }
+ %><%= task.toJSON() %><%
+ }
+ %><%= "]" %><%
+ } else {
%><?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
@@ -23,6 +60,14 @@
<a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase">
<img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" />
</a>
+ <div style="float:right;">
+ <a href="taskmonitor.jsp?filter=all">Show All Monitored Tasks</a> |
+ <a href="taskmonitor.jsp?filter=general">Show non-RPC Tasks</a> |
+ <a href="taskmonitor.jsp?filter=handler">Show All RPC Handler Tasks</a> |
+ <a href="taskmonitor.jsp?filter=rpc">Show Active RPC Calls</a> |
+ <a href="taskmonitor.jsp?filter=operation">Show Client Operations</a> |
+ <a href="taskmonitor.jsp?format=json&filter=<%= filter %>">View as JSON</a>
+ </div>
<h1 id="page_title">Task Monitor</h1>
<h2>Recent tasks</h2>
<% if(tasks.isEmpty()) { %>
@@ -35,6 +80,9 @@
<tr class="task-monitor-COMPLETE">
<td>COMPLETE</td>
</tr>
+ <tr class="task-monitor-WAITING">
+ <td>WAITING</td>
+ </tr>
<tr class="task-monitor-ABORTED">
<td>ABORTED</td>
</tr>
@@ -43,22 +91,22 @@
key.</p>
<table style="clear:right">
<tr>
+ <th>Start Time</th>
<th>Description</th>
+ <th>State</th>
<th>Status</th>
- <th>Age</th>
</tr>
<% for(MonitoredTask task : tasks) { %>
<tr class="task-monitor-<%= task.getState() %>">
+ <td><%= new Date(task.getStartTime()) %></td>
<td><%= task.getDescription() %></td>
- <td><%= task.getStatus() %></td>
- <td><%= StringUtils.formatTimeDiff(now, task.getStartTime()) %>
- <% if (task.getState() == MonitoredTask.State.COMPLETE) { %>
- (Completed <%= StringUtils.formatTimeDiff(now,
- task.getCompletionTimestamp()) %> ago)
- <% } else if (task.getState() == MonitoredTask.State.ABORTED) { %>
- (Aborted <%= StringUtils.formatTimeDiff(now,
- task.getCompletionTimestamp()) %> ago)
- <% } %>
+ <td><%= task.getState() %>
+ (since <%= StringUtils.formatTimeDiff(now,
+ task.getStateTime()) %> ago)
+ </td>
+ <td><%= task.getStatus() %>
+ (since <%= StringUtils.formatTimeDiff(now,
+ task.getStatusTime()) %> ago)
</td>
</tr>
<% } %>
@@ -66,3 +114,4 @@
<% } %>
</body>
</html>
+<% } %>
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1181968&r1=1181967&r2=1181968&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Tue Oct 11 17:44:47 2011
@@ -44,13 +44,13 @@ public class TestTaskMonitor {
// Mark it as finished
task.markComplete("Finished!");
- assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+ assertEquals(MonitoredTask.State.COMPLETE, task.getState());
// It should still show up in the TaskMonitor list
assertEquals(1, tm.getTasks().size());
// If we mark its completion time back a few minutes, it should get gced
- ((MonitoredTaskImpl)taskFromTm).expireNow();
+ task.expireNow();
assertEquals(0, tm.getTasks().size());
}