You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:42:15 UTC

svn commit: r1181923 - in /hbase/branches/0.89: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/monitoring/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wa...

Author: nspiegelberg
Date: Tue Oct 11 17:42:14 2011
New Revision: 1181923

URL: http://svn.apache.org/viewvc?rev=1181923&view=rev
Log:
Port TaskMonitor & MonitoredTask from open source to hbase-90

Summary:
This is a port of the task monitor developed in the open source branch:
https://issues.apache.org/jira/browse/HBASE-3836

I also port the basic functionality of monitoring HMaster and HRegion with
these new tools.

The immediate purpose of this port is to allow implementation of an hbase "show
processlist" (#620988) in a manner that will not require redundancy when
committing to open source.

Test Plan: Ran all unit tests and ran load tests on Nicolas's dev cluster.
Reviewed By: nspiegelberg
Reviewers: nspiegelberg
CC: hbase@lists, riley, nspiegelberg, lshepard, mattwkelly
Differential Revision: 278835

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
    hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/
    hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html
    hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
Modified:
    hbase/branches/0.89/pom.xml
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java
    hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp
    hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
    hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.89/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/pom.xml?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/pom.xml (original)
+++ hbase/branches/0.89/pom.xml Tue Oct 11 17:42:14 2011
@@ -382,6 +382,12 @@
                   package="org.apache.hadoop.hbase.generated.regionserver"
                   webxml="${build.webapps}/regionserver/WEB-INF/web.xml"/>
 
+                <mkdir dir="${build.webapps}/taskmonitor/WEB-INF"/>
+                <jspcompiler uriroot="${src.webapps}/taskmonitor"
+                    outputdir="${generated.sources}"
+                  package="org.apache.hadoop.hbase.generated.taskmonitor"
+                  webxml="${build.webapps}/taskmonitor/WEB-INF/web.xml"/>
+
                 <exec executable="sh">
                     <arg line="${basedir}/src/saveVersion.sh ${project.version} ${generated.sources}"/>
                 </exec>

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 17:42:14 2011
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.ipc.HMast
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -540,14 +542,20 @@ public class HMaster extends Thread impl
   /** Main processing loop */
   @Override
   public void run() {
-   try {
-     joinCluster();
-     initPreferredAssignment();
-     startServiceThreads();
-   }catch (IOException e) {
-     LOG.fatal("Unhandled exception. Master quits.", e);
-     return;
-   }
+    MonitoredTask startupStatus =
+      TaskMonitor.get().createStatus("Master startup");
+    startupStatus.setDescription("Master startup");
+    try {
+      joinCluster();
+      initPreferredAssignment();
+      startupStatus.setStatus("Initializing master service threads");
+      startServiceThreads();
+      startupStatus.markComplete("Initialization successful");
+    } catch (IOException e) {
+      LOG.fatal("Unhandled exception. Master quits.", e);
+      startupStatus.cleanup();
+      return;
+    }
     try {
       /* Main processing loop */
       FINISHED: while (!this.closed.get()) {
@@ -579,9 +587,11 @@ public class HMaster extends Thread impl
       }
     } catch (Throwable t) {
       LOG.fatal("Unhandled exception. Starting shutdown.", t);
+      startupStatus.cleanup();
       this.closed.set(true);
     }
 
+    startupStatus.cleanup();
     if (!this.shutdownRequested.get()) {  // shutdown not by request
       shutdown();  // indicated that master is shutting down
       startShutdown();  // get started with shutdown: stop scanners etc.

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+public interface MonitoredTask {
+  enum State {
+    RUNNING,
+    COMPLETE,
+    ABORTED;
+  }
+
+  public abstract long getStartTime();
+
+  public abstract String getDescription();
+
+  public abstract String getStatus();
+
+  public abstract State getState();
+
+  public abstract long getCompletionTimestamp();
+
+  public abstract void markComplete(String msg);
+  public abstract void abort(String msg);
+
+  public abstract void setStatus(String status);
+
+  public abstract void setDescription(String description);
+
+  /**
+   * Explicitly mark this status as able to be cleaned up,
+   * even though it might not be complete.
+   */
+  public abstract void cleanup();
+
+
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class MonitoredTaskImpl implements MonitoredTask {
+  private long startTime;
+  private long completionTimestamp = -1;
+
+  private String status;
+  private String description;
+
+  private State state = State.RUNNING;
+
+  public MonitoredTaskImpl() {
+    startTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public String getDescription() {
+    return description;
+  }
+
+  @Override
+  public String getStatus() {
+    return status;
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public long getCompletionTimestamp() {
+    return completionTimestamp;
+  }
+
+  @Override
+  public void markComplete(String status) {
+    state = State.COMPLETE;
+    setStatus(status);
+    completionTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void abort(String msg) {
+    setStatus(msg);
+    state = State.ABORTED;
+    completionTimestamp = System.currentTimeMillis();
+  }
+
+  @Override
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  @Override
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  @Override
+  public void cleanup() {
+    if (state == State.RUNNING) {
+      state = State.ABORTED;
+      completionTimestamp = System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * Force the completion timestamp backwards so that
+   * it expires now.
+   */
+  @VisibleForTesting
+  void expireNow() {
+    completionTimestamp -= 180 * 1000;
+  }
+}

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+/**
+ * Singleton which keeps track of tasks going on in this VM.
+ * A Task here is anything which takes more than a few seconds
+ * and the user might want to inquire about the status
+ */
+public class TaskMonitor {
+  private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
+
+  // Don't keep around any tasks that have completed more than
+  // 60 seconds ago
+  private static final long EXPIRATION_TIME = 60*1000;
+
+  @VisibleForTesting
+  static final int MAX_TASKS = 1000;
+
+  private static TaskMonitor instance;
+  private List<TaskAndWeakRefPair> tasks =
+    Lists.newArrayList();
+
+  /**
+   * Get singleton instance.
+   * TODO this would be better off scoped to a single daemon
+   */
+  public static synchronized TaskMonitor get() {
+    if (instance == null) {
+      instance = new TaskMonitor();
+    }
+    return instance;
+  }
+
+  public MonitoredTask createStatus(String description) {
+    MonitoredTask stat = new MonitoredTaskImpl();
+    stat.setDescription(description);
+    MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
+        stat.getClass().getClassLoader(),
+        new Class<?>[] { MonitoredTask.class },
+        new PassthroughInvocationHandler<MonitoredTask>(stat));
+
+    TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+    tasks.add(pair);
+    return proxy;
+  }
+
+  private synchronized void purgeExpiredTasks() {
+    int size = 0;
+
+    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
+         it.hasNext();) {
+      TaskAndWeakRefPair pair = it.next();
+      MonitoredTask stat = pair.get();
+
+      if (pair.isDead()) {
+        // The class who constructed this leaked it. So we can
+        // assume it's done.
+        if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
+          LOG.warn("Status " + stat + " appears to have been leaked");
+          stat.cleanup();
+        }
+      }
+
+      if (canPurge(stat)) {
+        it.remove();
+      } else {
+        size++;
+      }
+    }
+
+    if (size > MAX_TASKS) {
+      LOG.warn("Too many actions in action monitor! Purging some.");
+      tasks = tasks.subList(size - MAX_TASKS, size);
+    }
+  }
+
+  public synchronized List<MonitoredTask> getTasks() {
+    purgeExpiredTasks();
+    ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
+    for (TaskAndWeakRefPair pair : tasks) {
+      ret.add(pair.get());
+    }
+    return ret;
+  }
+
+  private boolean canPurge(MonitoredTask stat) {
+    long cts = stat.getCompletionTimestamp();
+    return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
+  }
+
+  /**
+   * This class encapsulates an object as well as a weak reference to a proxy
+   * that passes through calls to that object. In art form:
+   * <code>
+   *     Proxy  <------------------
+   *       |                       \
+   *       v                        \
+   * PassthroughInvocationHandler   |  weak reference
+   *       |                       /
+   * MonitoredTaskImpl            /
+   *       |                     /
+   * StatAndWeakRefProxy  ------/
+   *
+   * Since we only return the Proxy to the creator of the MonitorableStatus,
+   * this means that they can leak that object, and we'll detect it
+   * since our weak reference will go null. But, we still have the actual
+   * object, so we can log it and display it as a leaked (incomplete) action.
+   */
+  private static class TaskAndWeakRefPair {
+    private MonitoredTask impl;
+    private WeakReference<MonitoredTask> weakProxy;
+
+    public TaskAndWeakRefPair(MonitoredTask stat,
+        MonitoredTask proxy) {
+      this.impl = stat;
+      this.weakProxy = new WeakReference<MonitoredTask>(proxy);
+    }
+
+    public MonitoredTask get() {
+      return impl;
+    }
+
+    public boolean isDead() {
+      return weakProxy.get() == null;
+    }
+  }
+
+  /**
+   * An InvocationHandler that simply passes through calls to the original object.
+   */
+  private static class PassthroughInvocationHandler<T> implements InvocationHandler {
+    private T delegatee;
+
+    public PassthroughInvocationHandler(T delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      return method.invoke(delegatee, args);
+    }
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 17:42:14 2011
@@ -76,6 +76,8 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -471,15 +473,20 @@ public class HRegion implements HeapSize
    */
   public long initialize(final Progressable reporter)
   throws IOException {
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Initializing region " + this);
     // Write HRI to a file in case we need to recover .META.
+    status.setStatus("Writing region info on filesystem");
     checkRegioninfoOnFilesystem();
 
     // Remove temporary data left over from old regions
+    status.setStatus("Cleaning up temporary data from old regions");
     cleanupTmpDir();
 
     // Load in all the HStores.  Get maximum seqid.
     long maxSeqId = -1;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
+      status.setStatus("Instantiating store for column family " + c);
       Store store = instantiateHStore(this.tableDir, c);
       this.stores.put(c.getName(), store);
       long storeSeqId = store.getMaxSequenceId();
@@ -488,11 +495,13 @@ public class HRegion implements HeapSize
       }
     }
     // Recover any edits if available.
-    maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
+    maxSeqId = replayRecoveredEditsIfAny(
+        this.regiondir, maxSeqId, reporter, status);
 
     // Get rid of any splits or merges that were lost in-progress.  Clean out
     // these directories here on open.  We may be opening a region that was
     // being split but we crashed in the middle of it all.
+    status.setStatus("Cleaning up detritus from prior splits");
     FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
     FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
 
@@ -507,6 +516,7 @@ public class HRegion implements HeapSize
     // (particularly if no recovered edits, seqid will be -1).
     long nextSeqid = maxSeqId + 1;
     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
+    status.markComplete("Region opened successfully");
     return nextSeqid;
   }
 
@@ -647,11 +657,16 @@ public class HRegion implements HeapSize
    * @throws IOException e
    */
   public List<StoreFile> close(final boolean abort) throws IOException {
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Closing region " + this + (abort ? " due to abort" : ""));
     if (isClosed()) {
+      status.abort("Already got closed by another process");
       LOG.warn("region " + this + " already closed");
       return null;
     }
+    status.setStatus("Waiting for split lock");
     synchronized (splitLock) {
+      status.setStatus("Disabling compacts and flushes for region");
       boolean wasFlushing = false;
       synchronized (writestate) {
         // Disable compacting and flushing by background threads for this
@@ -674,11 +689,13 @@ public class HRegion implements HeapSize
       // that will clear out of the bulk of the memstore before we put up
       // the close flag?
       if (!abort && !wasFlushing && worthPreFlushing()) {
+        status.setStatus("Pre-flushing region before close");
         LOG.info("Running close preflush of " + this.getRegionNameAsString());
-        internalFlushcache();
+        internalFlushcache(status);
       }
       newScannerLock.writeLock().lock();
       this.closing.set(true);
+      status.setStatus("Disabling writes for close");
       try {
         splitsAndClosesLock.writeLock().lock();
         LOG.debug("Updates disabled for region, no outstanding scanners on " +
@@ -692,7 +709,7 @@ public class HRegion implements HeapSize
 
           // Don't flush the cache if we are aborting
           if (!abort) {
-            internalFlushcache();
+            internalFlushcache(status);
           }
 
           List<StoreFile> result = new ArrayList<StoreFile>();
@@ -700,6 +717,7 @@ public class HRegion implements HeapSize
             result.addAll(store.close());
           }
           this.closed.set(true);
+          status.markComplete("Closed");
           LOG.info("Closed " + this);
           return result;
         } finally {
@@ -1084,7 +1102,9 @@ public class HRegion implements HeapSize
    * because a Snapshot was not properly persisted.
    */
   public boolean flushcache() throws IOException {
+    MonitoredTask status = TaskMonitor.get().createStatus("Flushing" + this);
     if (this.closed.get()) {
+      status.abort("Skipped: closed");
       return false;
     }
     synchronized (writestate) {
@@ -1097,14 +1117,19 @@ public class HRegion implements HeapSize
               writestate.flushing + ", writesEnabled=" +
               writestate.writesEnabled);
         }
+        status.abort("Not flushing since " + (writestate.flushing ?
+              "already flushing" : "writes not enabled"));
         return false;
       }
     }
     try {
       // Prevent splits and closes
+      status.setStatus("Acquiring readlock on region");
       splitsAndClosesLock.readLock().lock();
       try {
-        return internalFlushcache();
+        boolean result = internalFlushcache(status);
+        status.markComplete("Flush successful");
+        return result;
       } finally {
         splitsAndClosesLock.readLock().unlock();
       }
@@ -1113,6 +1138,7 @@ public class HRegion implements HeapSize
         writestate.flushing = false;
         this.writestate.flushRequested = false;
         writestate.notifyAll();
+        status.cleanup();
       }
     }
   }
@@ -1144,6 +1170,7 @@ public class HRegion implements HeapSize
    * routes.
    *
    * <p> This method may block for some time.
+   * @param status
    *
    * @return true if the region needs compacting
    *
@@ -1151,20 +1178,22 @@ public class HRegion implements HeapSize
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  protected boolean internalFlushcache() throws IOException {
-    return internalFlushcache(this.log, -1);
+  protected boolean internalFlushcache(MonitoredTask status)
+    throws IOException {
+    return internalFlushcache(this.log, -1, status);
   }
 
   /**
    * @param wal Null if we're NOT to go via hlog/wal.
    * @param myseqid The seqid to use if <code>wal</code> is null writing out
    * flush file.
+   * @param status
    * @return true if the region needs compacting
    * @throws IOException
    * @see {@link #internalFlushcache()}
    */
-  protected boolean internalFlushcache(final HLog wal, final long myseqid)
-  throws IOException {
+  protected boolean internalFlushcache(final HLog wal, final long myseqid,
+      MonitoredTask status) throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time
@@ -1193,7 +1222,9 @@ public class HRegion implements HeapSize
     // We have to take a write lock during snapshot, or else a write could
     // end up in both snapshot and memstore (makes it difficult to do atomic
     // rows then)
+    status.setStatus("Obtaining lock to block concurrent updates");
     this.updatesLock.writeLock().lock();
+    status.setStatus("Preparing to flush by snapshotting stores");
     final long currentMemStoreSize = this.memstoreSize.get();
     //copy the array of per column family memstore values
     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(
@@ -1214,6 +1245,7 @@ public class HRegion implements HeapSize
       this.updatesLock.writeLock().unlock();
     }
 
+    status.setStatus("Flushing stores");
     LOG.debug("Finished snapshotting, commencing flushing stores");
 
     // Any failure from here on out will be catastrophic requiring server
@@ -1227,7 +1259,7 @@ public class HRegion implements HeapSize
       // just-made new flush store file.
 
       for (StoreFlusher flusher : storeFlushers) {
-        flusher.flushCache();
+        flusher.flushCache(status);
       }
 
       Callable<Void> atomicWork = internalPreFlushcacheCommit();
@@ -1276,6 +1308,7 @@ public class HRegion implements HeapSize
       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
           Bytes.toStringBinary(getRegionName()));
       dse.initCause(t);
+      status.abort("Flush failed: " + StringUtils.stringifyException(t));
       throw dse;
     }
 
@@ -1305,6 +1338,7 @@ public class HRegion implements HeapSize
         this + " in " + time + "ms, sequence id=" + sequenceId +
         ", compaction requested=" + compactionRequested +
         ((wal == null)? "; wal=null": ""));
+      status.setStatus("Finished memstore flush");
     }
     this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
 
@@ -2173,13 +2207,14 @@ public class HRegion implements HeapSize
    * @param minSeqId Any edit found in split editlogs needs to be in excess of
    * this minSeqId to be applied, else its skipped.
    * @param reporter
+   * @param status
    * @return the sequence id of the last edit added to this region out of the
    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
    * @throws UnsupportedEncodingException
    * @throws IOException
    */
   protected long replayRecoveredEditsIfAny(final Path regiondir,
-      final long minSeqId, final Progressable reporter)
+      final long minSeqId, final Progressable reporter, MonitoredTask status)
   throws UnsupportedEncodingException, IOException {
     long seqid = minSeqId;
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
@@ -2205,7 +2240,7 @@ public class HRegion implements HeapSize
     }
     if (seqid > minSeqId) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqid);
+      internalFlushcache(null, seqid, status);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     for (Path file: files) {
@@ -2230,7 +2265,10 @@ public class HRegion implements HeapSize
   private long replayRecoveredEdits(final Path edits,
       final long minSeqId, final Progressable reporter)
     throws IOException {
-    LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId);
+    String msg = "Replaying edits from " + edits + "; minSeqId=" + minSeqId;
+    LOG.info(msg);
+    MonitoredTask status = TaskMonitor.get().createStatus(msg);
+    status.setStatus("Opening logs");
     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
     try {
     long currentEditSeqId = minSeqId;
@@ -2281,40 +2319,51 @@ public class HRegion implements HeapSize
           flush = restoreEdit(store, kv);
           editsCount++;
         }
-        if (flush) internalFlushcache(null, currentEditSeqId);
+        if (flush) internalFlushcache(null, currentEditSeqId, status);
 
         // Every 'interval' edits, tell the reporter we're making progress.
         // Have seen 60k edits taking 3minutes to complete.
         if (reporter != null && (editsCount % interval) == 0) {
+          status.setStatus("Replaying edits..." +
+              " skipped=" + skippedEdits +
+              " edits=" + editsCount);
           reporter.progress();
         }
       }
     } catch (EOFException eof) {
       Path p = HLog.moveAsideBadEditsFile(fs, edits);
-      LOG.warn("Encountered EOF. Most likely due to Master failure during " +
+      msg = "Encountered EOF. Most likely due to Master failure during " +
           "log spliting, so we have this data in another edit.  " +
-          "Continuing, but renaming " + edits + " as " + p, eof);
+          "Continuing, but renaming " + edits + " as " + p;
+      LOG.warn(msg, eof);
+      status.setStatus(msg);
     } catch (IOException ioe) {
       // If the IOE resulted from bad file format,
       // then this problem is idempotent and retrying won't help
       if (ioe.getCause() instanceof ParseException) {
         Path p = HLog.moveAsideBadEditsFile(fs, edits);
-        LOG.warn("File corruption encountered!  " +
-            "Continuing, but renaming " + edits + " as " + p, ioe);
+        msg = "File corruption encountered!  " +
+            "Continuing, but renaming " + edits + " as " + p;
+        LOG.warn(msg, ioe);
+        status.setStatus(msg);
       } else {
         // other IO errors may be transient (bad network connection,
         // checksum exception on one datanode, etc).  throw & retry
+        status.abort(StringUtils.stringifyException(ioe));
         throw ioe;
       }
     }
+    msg = "Applied " + editsCount + ", skipped " + skippedEdits +
+        ", firstSeqIdInLog=" + firstSeqIdInLog +
+        ", maxSeqIdInLog=" + currentEditSeqId;
+    status.markComplete(msg);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
-          ", firstSeqIdInLog=" + firstSeqIdInLog +
-          ", maxSeqIdInLog=" + currentEditSeqId);
+      LOG.debug(msg);
     }
     return currentEditSeqId;
     } finally {
       reader.close();
+      status.cleanup();
     }
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 17:42:14 2011
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -445,28 +447,34 @@ public class Store implements HeapSize {
    * previously.
    * @param logCacheFlushId flush sequence number
    * @param snapshot
+   * @param snapshotTimeRangeTracker
+   * @param status
    * @return true if a compaction is needed
    * @throws IOException
    */
   private StoreFile flushCache(final long logCacheFlushId,
       SortedSet<KeyValue> snapshot,
-      TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
+      TimeRangeTracker snapshotTimeRangeTracker,
+      MonitoredTask status) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
+    return internalFlushCache(snapshot, logCacheFlushId,
+        snapshotTimeRangeTracker, status);
   }
 
   /*
    * @param cache
    * @param logCacheFlushId
+   * @param snapshotTimeRangeTracker
+   * @param status
    * @return StoreFile created.
    * @throws IOException
    */
   private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
       final long logCacheFlushId,
-      TimeRangeTracker snapshotTimeRangeTracker)
-      throws IOException {
+      TimeRangeTracker snapshotTimeRangeTracker,
+      MonitoredTask status) throws IOException {
     StoreFile.Writer writer;
     String fileName;
     long flushed = 0;
@@ -479,6 +487,7 @@ public class Store implements HeapSize {
     // flush to list of store files.  Add cleanup of anything put on filesystem
     // if we fail.
     synchronized (flushLock) {
+      status.setStatus("Flushing " + this + ": creating writer");
       // A. Write the map out to the disk
       writer = createWriterInTmp(set.size());
       writer.setTimeRangeTracker(snapshotTimeRangeTracker);
@@ -495,7 +504,9 @@ public class Store implements HeapSize {
       } finally {
         // Write out the log sequence number that corresponds to this output
         // hfile.  The hfile is current up to and including logCacheFlushId.
+        status.setStatus("Flushing " + this + ": appending metadata");
         writer.appendMetadata(logCacheFlushId, false);
+        status.setStatus("Flushing " + this + ": closing flushed file");
         writer.close();
       }
     }
@@ -628,6 +639,10 @@ public class Store implements HeapSize {
     long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
 
     // Ready to go. Have list of files to compact.
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        (cr.isMajor() ? "Major " : "") + "Compaction of "
+        + this.storeNameStr + " on "
+        + this.region.getRegionInfo().getRegionNameAsString());
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
@@ -636,6 +651,7 @@ public class Store implements HeapSize {
 
     StoreFile sf = null;
     try {
+      status.setStatus("Compacting " + filesToCompact.size() + " file(s)");
       StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
       sf = completeCompaction(filesToCompact, writer);
@@ -645,6 +661,7 @@ public class Store implements HeapSize {
       }
     }
 
+    status.markComplete("Completed compaction");
     LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
         + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
@@ -1690,8 +1707,9 @@ public class Store implements HeapSize {
     }
 
     @Override
-    public void flushCache() throws IOException {
-      storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
+    public void flushCache(MonitoredTask status) throws IOException {
+      storeFile = Store.this.flushCache(cacheFlushId, snapshot,
+          snapshotTimeRangeTracker, status);
     }
 
     @Override

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Tue Oct 11 17:42:14 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+
 /**
  * A package protected interface for a store flushing.
  * A store flusher carries the state required to prepare/flush/commit the
@@ -43,9 +45,10 @@ interface StoreFlusher {
    * A length operation which doesn't require locking out any function
    * of the store.
    *
+   * @param status
    * @throws IOException in case the flush fails
    */
-  void flushCache() throws IOException;
+  void flushCache(MonitoredTask status) throws IOException;
 
   /**
    * Commit the flush - add the store file to the store and clear the

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 17:42:14 2011
@@ -74,6 +74,8 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -1252,11 +1254,14 @@ public class HLog implements Syncable {
   public static List<Path> splitLog(final Path rootDir, final Path srcDir,
     Path oldLogDir, final FileSystem fs, final Configuration conf)
   throws IOException {
-
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Splitting logs in " + srcDir);
     long startTime = System.currentTimeMillis();
     List<Path> splits = null;
+    status.setStatus("Determining files to split");
     if (!fs.exists(srcDir)) {
       // Nothing to do
+      status.markComplete("No log directory existed to split.");
       return splits;
     }
     FileStatus [] logfiles = fs.listStatus(srcDir);
@@ -1266,6 +1271,7 @@ public class HLog implements Syncable {
     }
     LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
       srcDir.toString());
+    status.setStatus("Performing split");
     splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
     try {
       FileStatus[] files = fs.listStatus(srcDir);
@@ -1285,6 +1291,7 @@ public class HLog implements Syncable {
       throw io;
     }
     lastSplitTime = System.currentTimeMillis() - startTime;
+    status.markComplete("Log splits complete.");
     LOG.info("hlog file splitting completed in " + lastSplitTime +
         " ms for " + srcDir.toString());
     return splits;
@@ -1354,6 +1361,8 @@ public class HLog implements Syncable {
       Collections.synchronizedMap(
         new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
     List<Path> splits = null;
+    MonitoredTask status = TaskMonitor.get().createStatus(
+        "Splitting logs in " + srcDir);
 
     // Number of logs in a read batch
     // More means faster but bigger mem consumption
@@ -1363,6 +1372,7 @@ public class HLog implements Syncable {
 
     lastSplitSize = 0;
 
+    status.setStatus("Performing split");
     try {
       int i = -1;
       while (i < logfiles.length) {
@@ -1388,6 +1398,7 @@ public class HLog implements Syncable {
             LOG.warn("EOF from hlog " + logPath + ".  continuing");
             processedLogs.add(logPath);
           } catch (InterruptedIOException iioe) {
+            status.abort(StringUtils.stringifyException(iioe));
             throw iioe;
           } catch (IOException e) {
             // If the IOE resulted from bad file format,
@@ -1401,6 +1412,7 @@ public class HLog implements Syncable {
                   ". Marking as corrupted", e);
                 corruptedLogs.add(logPath);
               } else {
+                status.abort(StringUtils.stringifyException(e));
                 throw e;
               }
             }
@@ -1409,6 +1421,7 @@ public class HLog implements Syncable {
         writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
       }
       if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
+        status.abort("Discovered orphan hlog after split");
         throw new IOException("Discovered orphan hlog after split. Maybe " +
           "HRegionServer was not dead when we started");
       }
@@ -1420,7 +1433,9 @@ public class HLog implements Syncable {
         LOG.debug("Closed " + wap.p);
       }
     }
+    status.setStatus("Archiving logs after completed split");
     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    status.markComplete("Split completed");
     return splits;
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java Tue Oct 11 17:42:14 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.http.HttpServer
 import org.mortbay.jetty.handler.ContextHandlerCollection;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.webapp.WebAppContext;
 
 import java.io.IOException;
 import java.net.URL;
@@ -79,6 +80,12 @@ public class InfoServer extends HttpServ
       logContext.addServlet(DefaultServlet.class, "/");
       defaultContexts.put(logContext, true);
     }
+    // Now bring up the task monitor
+    WebAppContext taskMonitorContext =
+      new WebAppContext(parent, "taskmontior", "/taskmonitor");
+    taskMonitorContext.addServlet(DefaultServlet.class, "/");
+    taskMonitorContext.setWar(appDir + "/taskmonitor");
+    defaultContexts.put(taskMonitorContext, true);
   }
 
   /**

Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp Tue Oct 11 17:42:14 2011
@@ -37,7 +37,7 @@
 <body>
 <a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase"><img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" /></a>
 <h1 id="page_title">Master: <%=master.getMasterAddress().getHostname()%>:<%=master.getMasterAddress().getPort()%></h1>
-<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a></p>
+<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a>, <a href="/taskmonitor">Task Monitor</a></p>
 
 <!-- Various warnings that cluster admins should be aware of -->
 <% if (JvmVersion.isBadJvmVersion()) { %>

Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Tue Oct 11 17:42:14 2011
@@ -33,7 +33,7 @@
 <body>
 <a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase"><img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" /></a>
 <h1 id="page_title">Region Server: <%= serverInfo.getServerAddress().getHostname() %>:<%= serverInfo.getServerAddress().getPort() %></h1>
-<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a></p>
+<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a>, <a href="/taskmonitor">Task Monitor</a></p>
 <hr id="head_rule" />
 
 <h2>Region Server Attributes</h2>

Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css Tue Oct 11 17:42:14 2011
@@ -1,8 +1,8 @@
-h1, h2, h3 { color: DarkSlateBlue }
-table { border: thin solid DodgerBlue }
-tr { border: thin solid DodgerBlue }
-td { border: thin solid DodgerBlue }
-th { border: thin solid DodgerBlue }
+h1, h2, h3 { color: #483D8B; }
+table { border: thin solid #1e90ff; }
+tr { border: thin solid #1e90ff; }
+td { border: thin solid #1e90ff; }
+th { border: thin solid #1e90ff; }
 #logo {float: left;}
 #logo img {border: none;}
 #page_title {padding-top: 27px;}
@@ -13,3 +13,16 @@ div.warning {
   font-size: 110%;
   font-weight: bold;
 }
+
+tr.task-monitor-COMPLETE td {
+  background-color: #afa;
+}
+
+tr.task-monitor-IDLE td {
+  background-color: #ccc;
+  font-style: italic;
+}
+
+tr.task-monitor-ABORTED td {
+  background-color: #ffa;
+}

Added: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html (added)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html Tue Oct 11 17:42:14 2011
@@ -0,0 +1 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=taskmonitor.jsp"/>

Added: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp (added)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp Tue Oct 11 17:42:14 2011
@@ -0,0 +1,50 @@
+<%@ page contentType="text/html;charset=UTF-8"
+  import="java.util.*"
+  import="org.codehaus.jettison.json.JSONArray"
+  import="org.codehaus.jettison.json.JSONException"
+  import="org.codehaus.jettison.json.JSONObject"
+  import="org.apache.hadoop.hbase.ipc.HBaseRPC"
+  import="org.apache.hadoop.hbase.monitoring.MonitoredTask"
+  import="org.apache.hadoop.hbase.monitoring.TaskMonitor" %><%
+  TaskMonitor taskMonitor = TaskMonitor.get();
+  long now = System.currentTimeMillis();
+  List<MonitoredTask> tasks = taskMonitor.getTasks();
+  Collections.reverse(tasks);
+%><?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+  <head><meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/>
+  <title>Task Monitor</title>
+<link rel="stylesheet" type="text/css" href="/static/hbase.css" />
+</head>
+<body>
+  <a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase">
+    <img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" />
+  </a>
+  <h1 id="page_title">Task Monitor</h1>
+  <h2>Recent tasks</h2>
+  <% if(tasks.isEmpty()) { %>
+    <p>No tasks currently running on this node.</p>
+  <% } else { %>
+    <table>
+      <tr>
+        <th>Description</th>
+        <th>Status</th>
+        <th>Age</th>
+      </tr>
+      <% for(MonitoredTask task : tasks) { %>
+        <tr class="task-monitor-<%= task.getState() %>">
+          <td><%= task.getDescription() %></td>
+          <td><%= task.getStatus() %></td>
+          <td><%= now - task.getStartTime() %>ms
+            <% if(task.getCompletionTimestamp() != -1) { %>
+              (Completed <%= now - task.getCompletionTimestamp() %>ms ago)
+            <% } %>
+          </td>
+        </tr>
+      <% } %>
+    </table>
+  <% } %>
+</body>
+</html>

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class TestTaskMonitor {
+
+  @Test
+  public void testTaskMonitorBasics() {
+    TaskMonitor tm = new TaskMonitor();
+    assertTrue("Task monitor should start empty",
+        tm.getTasks().isEmpty());
+
+    // Make a task and fetch it back out
+    MonitoredTask task = tm.createStatus("Test task");
+    MonitoredTask taskFromTm = tm.getTasks().get(0);
+
+    // Make sure the state is reasonable.
+    assertEquals(task.getDescription(), taskFromTm.getDescription());
+    assertEquals(-1, taskFromTm.getCompletionTimestamp());
+    assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
+
+    // Mark it as finished
+    task.markComplete("Finished!");
+    assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+
+    // It should still show up in the TaskMonitor list
+    assertEquals(1, tm.getTasks().size());
+
+    // If we mark its completion time back a few minutes, it should get gced
+    ((MonitoredTaskImpl)taskFromTm).expireNow();
+    assertEquals(0, tm.getTasks().size());
+  }
+
+  @Test
+  public void testTasksGetAbortedOnLeak() throws InterruptedException {
+    final TaskMonitor tm = new TaskMonitor();
+    assertTrue("Task monitor should start empty",
+        tm.getTasks().isEmpty());
+
+    final AtomicBoolean threadSuccess = new AtomicBoolean(false);
+    // Make a task in some other thread and leak it
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        MonitoredTask task = tm.createStatus("Test task");
+        assertEquals(MonitoredTask.State.RUNNING, task.getState());
+        threadSuccess.set(true);
+      }
+    };
+    t.start();
+    t.join();
+    // Make sure the thread saw the correct state
+    assertTrue(threadSuccess.get());
+
+    // Make sure the leaked reference gets cleared
+    System.gc();
+    System.gc();
+    System.gc();
+
+    // Now it should be aborted
+    MonitoredTask taskFromTm = tm.getTasks().get(0);
+    assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
+  }
+
+  @Test
+  public void testTaskLimit() throws Exception {
+    TaskMonitor tm = new TaskMonitor();
+    for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
+      tm.createStatus("task " + i);
+    }
+    // Make sure it was limited correctly
+    assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
+    // Make sure we culled the earlier tasks, not later
+    // (i.e. tasks 0 through 9 should have been deleted)
+    assertEquals("task 10", tm.getTasks().get(0).getDescription());
+  }
+
+}

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Oct 11 17:42:14 2011
@@ -54,12 +54,14 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
 import com.google.common.base.Joiner;
+import org.mockito.Mockito;
 
 /**
  * Test class fosr the Store
@@ -497,7 +499,7 @@ public class TestStore extends TestCase 
   private static void flushStore(Store store, long id) throws IOException {
     StoreFlusher storeFlusher = store.getStoreFlusher(id);
     storeFlusher.prepare();
-    storeFlusher.flushCache();
+    storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
     storeFlusher.commit();
   }
 

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Oct 11 17:42:14 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -52,6 +53,7 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Test replay of edits out of a WAL split.
@@ -373,9 +375,11 @@ public class TestWALReplay {
     try {
       final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
           null) {
-        protected boolean internalFlushcache(HLog wal, long myseqid)
+        protected boolean internalFlushcache(HLog wal, long myseqid,
+            MonitoredTask status)
         throws IOException {
-          boolean b = super.internalFlushcache(wal, myseqid);
+          boolean b = super.internalFlushcache(wal, myseqid,
+              Mockito.mock(MonitoredTask.class));
           flushcount.incrementAndGet();
           return b;
         };
@@ -470,4 +474,4 @@ public class TestWALReplay {
     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
     return wal;
   }
-}
\ No newline at end of file
+}