You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:42:15 UTC
svn commit: r1181923 - in /hbase/branches/0.89: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/monitoring/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regionserver/wa...
Author: nspiegelberg
Date: Tue Oct 11 17:42:14 2011
New Revision: 1181923
URL: http://svn.apache.org/viewvc?rev=1181923&view=rev
Log:
Port TaskMonitor & MonitoredTask from open source to hbase-90
Summary:
This is a port of the task monitor developed in the open source branch:
https://issues.apache.org/jira/browse/HBASE-3836
I also port the basic functionality of monitoring HMaster and HRegion with
these new tools.
The immediate purpose of this port is to allow implementation of an hbase "show
processlist" (#620988) in a manner that will not require redundancy when
committing to open source.
Test Plan: Ran all unit tests and ran load tests on Nicolas's dev cluster.
Reviewed By: nspiegelberg
Reviewers: nspiegelberg
CC: hbase@lists, riley, nspiegelberg, lshepard, mattwkelly
Differential Revision: 278835
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/
hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html
hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
Modified:
hbase/branches/0.89/pom.xml
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java
hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp
hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/branches/0.89/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/pom.xml?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/pom.xml (original)
+++ hbase/branches/0.89/pom.xml Tue Oct 11 17:42:14 2011
@@ -382,6 +382,12 @@
package="org.apache.hadoop.hbase.generated.regionserver"
webxml="${build.webapps}/regionserver/WEB-INF/web.xml"/>
+ <mkdir dir="${build.webapps}/taskmonitor/WEB-INF"/>
+ <jspcompiler uriroot="${src.webapps}/taskmonitor"
+ outputdir="${generated.sources}"
+ package="org.apache.hadoop.hbase.generated.taskmonitor"
+ webxml="${build.webapps}/taskmonitor/WEB-INF/web.xml"/>
+
<exec executable="sh">
<arg line="${basedir}/src/saveVersion.sh ${project.version} ${generated.sources}"/>
</exec>
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct 11 17:42:14 2011
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.ipc.HMast
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -540,14 +542,20 @@ public class HMaster extends Thread impl
/** Main processing loop */
@Override
public void run() {
- try {
- joinCluster();
- initPreferredAssignment();
- startServiceThreads();
- }catch (IOException e) {
- LOG.fatal("Unhandled exception. Master quits.", e);
- return;
- }
+ MonitoredTask startupStatus =
+ TaskMonitor.get().createStatus("Master startup");
+ startupStatus.setDescription("Master startup");
+ try {
+ joinCluster();
+ initPreferredAssignment();
+ startupStatus.setStatus("Initializing master service threads");
+ startServiceThreads();
+ startupStatus.markComplete("Initialization successful");
+ } catch (IOException e) {
+ LOG.fatal("Unhandled exception. Master quits.", e);
+ startupStatus.cleanup();
+ return;
+ }
try {
/* Main processing loop */
FINISHED: while (!this.closed.get()) {
@@ -579,9 +587,11 @@ public class HMaster extends Thread impl
}
} catch (Throwable t) {
LOG.fatal("Unhandled exception. Starting shutdown.", t);
+ startupStatus.cleanup();
this.closed.set(true);
}
+ startupStatus.cleanup();
if (!this.shutdownRequested.get()) { // shutdown not by request
shutdown(); // indicated that master is shutting down
startShutdown(); // get started with shutdown: stop scanners etc.
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+public interface MonitoredTask {
+ enum State {
+ RUNNING,
+ COMPLETE,
+ ABORTED;
+ }
+
+ public abstract long getStartTime();
+
+ public abstract String getDescription();
+
+ public abstract String getStatus();
+
+ public abstract State getState();
+
+ public abstract long getCompletionTimestamp();
+
+ public abstract void markComplete(String msg);
+ public abstract void abort(String msg);
+
+ public abstract void setStatus(String status);
+
+ public abstract void setDescription(String description);
+
+ /**
+ * Explicitly mark this status as able to be cleaned up,
+ * even though it might not be complete.
+ */
+ public abstract void cleanup();
+
+
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class MonitoredTaskImpl implements MonitoredTask {
+ private long startTime;
+ private long completionTimestamp = -1;
+
+ private String status;
+ private String description;
+
+ private State state = State.RUNNING;
+
+ public MonitoredTaskImpl() {
+ startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String getStatus() {
+ return status;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public long getCompletionTimestamp() {
+ return completionTimestamp;
+ }
+
+ @Override
+ public void markComplete(String status) {
+ state = State.COMPLETE;
+ setStatus(status);
+ completionTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void abort(String msg) {
+ setStatus(msg);
+ state = State.ABORTED;
+ completionTimestamp = System.currentTimeMillis();
+ }
+
+ @Override
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ @Override
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public void cleanup() {
+ if (state == State.RUNNING) {
+ state = State.ABORTED;
+ completionTimestamp = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * Force the completion timestamp backwards so that
+ * it expires now.
+ */
+ @VisibleForTesting
+ void expireNow() {
+ completionTimestamp -= 180 * 1000;
+ }
+}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,176 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
+/**
+ * Singleton which keeps track of tasks going on in this VM.
+ * A Task here is anything which takes more than a few seconds
+ * and the user might want to inquire about the status
+ */
+public class TaskMonitor {
+ private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
+
+ // Don't keep around any tasks that have completed more than
+ // 60 seconds ago
+ private static final long EXPIRATION_TIME = 60*1000;
+
+ @VisibleForTesting
+ static final int MAX_TASKS = 1000;
+
+ private static TaskMonitor instance;
+ private List<TaskAndWeakRefPair> tasks =
+ Lists.newArrayList();
+
+ /**
+ * Get singleton instance.
+ * TODO this would be better off scoped to a single daemon
+ */
+ public static synchronized TaskMonitor get() {
+ if (instance == null) {
+ instance = new TaskMonitor();
+ }
+ return instance;
+ }
+
+ public MonitoredTask createStatus(String description) {
+ MonitoredTask stat = new MonitoredTaskImpl();
+ stat.setDescription(description);
+ MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(
+ stat.getClass().getClassLoader(),
+ new Class<?>[] { MonitoredTask.class },
+ new PassthroughInvocationHandler<MonitoredTask>(stat));
+
+ TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
+ tasks.add(pair);
+ return proxy;
+ }
+
+ private synchronized void purgeExpiredTasks() {
+ int size = 0;
+
+ for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
+ it.hasNext();) {
+ TaskAndWeakRefPair pair = it.next();
+ MonitoredTask stat = pair.get();
+
+ if (pair.isDead()) {
+ // The class who constructed this leaked it. So we can
+ // assume it's done.
+ if (stat.getState() == MonitoredTaskImpl.State.RUNNING) {
+ LOG.warn("Status " + stat + " appears to have been leaked");
+ stat.cleanup();
+ }
+ }
+
+ if (canPurge(stat)) {
+ it.remove();
+ } else {
+ size++;
+ }
+ }
+
+ if (size > MAX_TASKS) {
+ LOG.warn("Too many actions in action monitor! Purging some.");
+ tasks = tasks.subList(size - MAX_TASKS, size);
+ }
+ }
+
+ public synchronized List<MonitoredTask> getTasks() {
+ purgeExpiredTasks();
+ ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
+ for (TaskAndWeakRefPair pair : tasks) {
+ ret.add(pair.get());
+ }
+ return ret;
+ }
+
+ private boolean canPurge(MonitoredTask stat) {
+ long cts = stat.getCompletionTimestamp();
+ return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
+ }
+
+ /**
+ * This class encapsulates an object as well as a weak reference to a proxy
+ * that passes through calls to that object. In art form:
+ * <code>
+ * Proxy <------------------
+ * | \
+ * v \
+ * PassthroughInvocationHandler | weak reference
+ * | /
+ * MonitoredTaskImpl /
+ * | /
+ * StatAndWeakRefProxy ------/
+ *
+ * Since we only return the Proxy to the creator of the MonitorableStatus,
+ * this means that they can leak that object, and we'll detect it
+ * since our weak reference will go null. But, we still have the actual
+ * object, so we can log it and display it as a leaked (incomplete) action.
+ */
+ private static class TaskAndWeakRefPair {
+ private MonitoredTask impl;
+ private WeakReference<MonitoredTask> weakProxy;
+
+ public TaskAndWeakRefPair(MonitoredTask stat,
+ MonitoredTask proxy) {
+ this.impl = stat;
+ this.weakProxy = new WeakReference<MonitoredTask>(proxy);
+ }
+
+ public MonitoredTask get() {
+ return impl;
+ }
+
+ public boolean isDead() {
+ return weakProxy.get() == null;
+ }
+ }
+
+ /**
+ * An InvocationHandler that simply passes through calls to the original object.
+ */
+ private static class PassthroughInvocationHandler<T> implements InvocationHandler {
+ private T delegatee;
+
+ public PassthroughInvocationHandler(T delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ return method.invoke(delegatee, args);
+ }
+ }
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 17:42:14 2011
@@ -76,6 +76,8 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -471,15 +473,20 @@ public class HRegion implements HeapSize
*/
public long initialize(final Progressable reporter)
throws IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Initializing region " + this);
// Write HRI to a file in case we need to recover .META.
+ status.setStatus("Writing region info on filesystem");
checkRegioninfoOnFilesystem();
// Remove temporary data left over from old regions
+ status.setStatus("Cleaning up temporary data from old regions");
cleanupTmpDir();
// Load in all the HStores. Get maximum seqid.
long maxSeqId = -1;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
+ status.setStatus("Instantiating store for column family " + c);
Store store = instantiateHStore(this.tableDir, c);
this.stores.put(c.getName(), store);
long storeSeqId = store.getMaxSequenceId();
@@ -488,11 +495,13 @@ public class HRegion implements HeapSize
}
}
// Recover any edits if available.
- maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
+ maxSeqId = replayRecoveredEditsIfAny(
+ this.regiondir, maxSeqId, reporter, status);
// Get rid of any splits or merges that were lost in-progress. Clean out
// these directories here on open. We may be opening a region that was
// being split but we crashed in the middle of it all.
+ status.setStatus("Cleaning up detritus from prior splits");
FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
@@ -507,6 +516,7 @@ public class HRegion implements HeapSize
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
+ status.markComplete("Region opened successfully");
return nextSeqid;
}
@@ -647,11 +657,16 @@ public class HRegion implements HeapSize
* @throws IOException e
*/
public List<StoreFile> close(final boolean abort) throws IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Closing region " + this + (abort ? " due to abort" : ""));
if (isClosed()) {
+ status.abort("Already got closed by another process");
LOG.warn("region " + this + " already closed");
return null;
}
+ status.setStatus("Waiting for split lock");
synchronized (splitLock) {
+ status.setStatus("Disabling compacts and flushes for region");
boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
@@ -674,11 +689,13 @@ public class HRegion implements HeapSize
// that will clear out of the bulk of the memstore before we put up
// the close flag?
if (!abort && !wasFlushing && worthPreFlushing()) {
+ status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
- internalFlushcache();
+ internalFlushcache(status);
}
newScannerLock.writeLock().lock();
this.closing.set(true);
+ status.setStatus("Disabling writes for close");
try {
splitsAndClosesLock.writeLock().lock();
LOG.debug("Updates disabled for region, no outstanding scanners on " +
@@ -692,7 +709,7 @@ public class HRegion implements HeapSize
// Don't flush the cache if we are aborting
if (!abort) {
- internalFlushcache();
+ internalFlushcache(status);
}
List<StoreFile> result = new ArrayList<StoreFile>();
@@ -700,6 +717,7 @@ public class HRegion implements HeapSize
result.addAll(store.close());
}
this.closed.set(true);
+ status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
} finally {
@@ -1084,7 +1102,9 @@ public class HRegion implements HeapSize
* because a Snapshot was not properly persisted.
*/
public boolean flushcache() throws IOException {
+ MonitoredTask status = TaskMonitor.get().createStatus("Flushing" + this);
if (this.closed.get()) {
+ status.abort("Skipped: closed");
return false;
}
synchronized (writestate) {
@@ -1097,14 +1117,19 @@ public class HRegion implements HeapSize
writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled);
}
+ status.abort("Not flushing since " + (writestate.flushing ?
+ "already flushing" : "writes not enabled"));
return false;
}
}
try {
// Prevent splits and closes
+ status.setStatus("Acquiring readlock on region");
splitsAndClosesLock.readLock().lock();
try {
- return internalFlushcache();
+ boolean result = internalFlushcache(status);
+ status.markComplete("Flush successful");
+ return result;
} finally {
splitsAndClosesLock.readLock().unlock();
}
@@ -1113,6 +1138,7 @@ public class HRegion implements HeapSize
writestate.flushing = false;
this.writestate.flushRequested = false;
writestate.notifyAll();
+ status.cleanup();
}
}
}
@@ -1144,6 +1170,7 @@ public class HRegion implements HeapSize
* routes.
*
* <p> This method may block for some time.
+ * @param status
*
* @return true if the region needs compacting
*
@@ -1151,20 +1178,22 @@ public class HRegion implements HeapSize
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- protected boolean internalFlushcache() throws IOException {
- return internalFlushcache(this.log, -1);
+ protected boolean internalFlushcache(MonitoredTask status)
+ throws IOException {
+ return internalFlushcache(this.log, -1, status);
}
/**
* @param wal Null if we're NOT to go via hlog/wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out
* flush file.
+ * @param status
* @return true if the region needs compacting
* @throws IOException
* @see {@link #internalFlushcache()}
*/
- protected boolean internalFlushcache(final HLog wal, final long myseqid)
- throws IOException {
+ protected boolean internalFlushcache(final HLog wal, final long myseqid,
+ MonitoredTask status) throws IOException {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Clear flush flag.
// Record latest flush time
@@ -1193,7 +1222,9 @@ public class HRegion implements HeapSize
// We have to take a write lock during snapshot, or else a write could
// end up in both snapshot and memstore (makes it difficult to do atomic
// rows then)
+ status.setStatus("Obtaining lock to block concurrent updates");
this.updatesLock.writeLock().lock();
+ status.setStatus("Preparing to flush by snapshotting stores");
final long currentMemStoreSize = this.memstoreSize.get();
//copy the array of per column family memstore values
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(
@@ -1214,6 +1245,7 @@ public class HRegion implements HeapSize
this.updatesLock.writeLock().unlock();
}
+ status.setStatus("Flushing stores");
LOG.debug("Finished snapshotting, commencing flushing stores");
// Any failure from here on out will be catastrophic requiring server
@@ -1227,7 +1259,7 @@ public class HRegion implements HeapSize
// just-made new flush store file.
for (StoreFlusher flusher : storeFlushers) {
- flusher.flushCache();
+ flusher.flushCache(status);
}
Callable<Void> atomicWork = internalPreFlushcacheCommit();
@@ -1276,6 +1308,7 @@ public class HRegion implements HeapSize
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
+ status.abort("Flush failed: " + StringUtils.stringifyException(t));
throw dse;
}
@@ -1305,6 +1338,7 @@ public class HRegion implements HeapSize
this + " in " + time + "ms, sequence id=" + sequenceId +
", compaction requested=" + compactionRequested +
((wal == null)? "; wal=null": ""));
+ status.setStatus("Finished memstore flush");
}
this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
@@ -2173,13 +2207,14 @@ public class HRegion implements HeapSize
* @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped.
* @param reporter
+ * @param status
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws UnsupportedEncodingException
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
- final long minSeqId, final Progressable reporter)
+ final long minSeqId, final Progressable reporter, MonitoredTask status)
throws UnsupportedEncodingException, IOException {
long seqid = minSeqId;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
@@ -2205,7 +2240,7 @@ public class HRegion implements HeapSize
}
if (seqid > minSeqId) {
// Then we added some edits to memory. Flush and cleanup split edit files.
- internalFlushcache(null, seqid);
+ internalFlushcache(null, seqid, status);
}
// Now delete the content of recovered edits. We're done w/ them.
for (Path file: files) {
@@ -2230,7 +2265,10 @@ public class HRegion implements HeapSize
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final Progressable reporter)
throws IOException {
- LOG.info("Replaying edits from " + edits + "; minSeqId=" + minSeqId);
+ String msg = "Replaying edits from " + edits + "; minSeqId=" + minSeqId;
+ LOG.info(msg);
+ MonitoredTask status = TaskMonitor.get().createStatus(msg);
+ status.setStatus("Opening logs");
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
long currentEditSeqId = minSeqId;
@@ -2281,40 +2319,51 @@ public class HRegion implements HeapSize
flush = restoreEdit(store, kv);
editsCount++;
}
- if (flush) internalFlushcache(null, currentEditSeqId);
+ if (flush) internalFlushcache(null, currentEditSeqId, status);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % interval) == 0) {
+ status.setStatus("Replaying edits..." +
+ " skipped=" + skippedEdits +
+ " edits=" + editsCount);
reporter.progress();
}
}
} catch (EOFException eof) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
- LOG.warn("Encountered EOF. Most likely due to Master failure during " +
+ msg = "Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
- "Continuing, but renaming " + edits + " as " + p, eof);
+ "Continuing, but renaming " + edits + " as " + p;
+ LOG.warn(msg, eof);
+ status.setStatus(msg);
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
- LOG.warn("File corruption encountered! " +
- "Continuing, but renaming " + edits + " as " + p, ioe);
+ msg = "File corruption encountered! " +
+ "Continuing, but renaming " + edits + " as " + p;
+ LOG.warn(msg, ioe);
+ status.setStatus(msg);
} else {
// other IO errors may be transient (bad network connection,
// checksum exception on one datanode, etc). throw & retry
+ status.abort(StringUtils.stringifyException(ioe));
throw ioe;
}
}
+ msg = "Applied " + editsCount + ", skipped " + skippedEdits +
+ ", firstSeqIdInLog=" + firstSeqIdInLog +
+ ", maxSeqIdInLog=" + currentEditSeqId;
+ status.markComplete(msg);
if (LOG.isDebugEnabled()) {
- LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
- ", firstSeqIdInLog=" + firstSeqIdInLog +
- ", maxSeqIdInLog=" + currentEditSeqId);
+ LOG.debug(msg);
}
return currentEditSeqId;
} finally {
reader.close();
+ status.cleanup();
}
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 17:42:14 2011
@@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -445,28 +447,34 @@ public class Store implements HeapSize {
* previously.
* @param logCacheFlushId flush sequence number
* @param snapshot
+ * @param snapshotTimeRangeTracker
+ * @param status
* @return true if a compaction is needed
* @throws IOException
*/
private StoreFile flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
- TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
+ TimeRangeTracker snapshotTimeRangeTracker,
+ MonitoredTask status) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
- return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
+ return internalFlushCache(snapshot, logCacheFlushId,
+ snapshotTimeRangeTracker, status);
}
/*
* @param cache
* @param logCacheFlushId
+ * @param snapshotTimeRangeTracker
+ * @param status
* @return StoreFile created.
* @throws IOException
*/
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
final long logCacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker)
- throws IOException {
+ TimeRangeTracker snapshotTimeRangeTracker,
+ MonitoredTask status) throws IOException {
StoreFile.Writer writer;
String fileName;
long flushed = 0;
@@ -479,6 +487,7 @@ public class Store implements HeapSize {
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
+ status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
@@ -495,7 +504,9 @@ public class Store implements HeapSize {
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
+ status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
+ status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
}
}
@@ -628,6 +639,10 @@ public class Store implements HeapSize {
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
// Ready to go. Have list of files to compact.
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ (cr.isMajor() ? "Major " : "") + "Compaction of "
+ + this.storeNameStr + " on "
+ + this.region.getRegionInfo().getRegionNameAsString());
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
@@ -636,6 +651,7 @@ public class Store implements HeapSize {
StoreFile sf = null;
try {
+ status.setStatus("Compacting " + filesToCompact.size() + " file(s)");
StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
sf = completeCompaction(filesToCompact, writer);
@@ -645,6 +661,7 @@ public class Store implements HeapSize {
}
}
+ status.markComplete("Completed compaction");
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
@@ -1690,8 +1707,9 @@ public class Store implements HeapSize {
}
@Override
- public void flushCache() throws IOException {
- storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
+ public void flushCache(MonitoredTask status) throws IOException {
+ storeFile = Store.this.flushCache(cacheFlushId, snapshot,
+ snapshotTimeRangeTracker, status);
}
@Override
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Tue Oct 11 17:42:14 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+
/**
* A package protected interface for a store flushing.
* A store flusher carries the state required to prepare/flush/commit the
@@ -43,9 +45,10 @@ interface StoreFlusher {
* A length operation which doesn't require locking out any function
* of the store.
*
+ * @param status
* @throws IOException in case the flush fails
*/
- void flushCache() throws IOException;
+ void flushCache(MonitoredTask status) throws IOException;
/**
* Commit the flush - add the store file to the store and clear the
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Tue Oct 11 17:42:14 2011
@@ -74,6 +74,8 @@ import org.apache.hadoop.hbase.HServerIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -1252,11 +1254,14 @@ public class HLog implements Syncable {
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs, final Configuration conf)
throws IOException {
-
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Splitting logs in " + srcDir);
long startTime = System.currentTimeMillis();
List<Path> splits = null;
+ status.setStatus("Determining files to split");
if (!fs.exists(srcDir)) {
// Nothing to do
+ status.markComplete("No log directory existed to split.");
return splits;
}
FileStatus [] logfiles = fs.listStatus(srcDir);
@@ -1266,6 +1271,7 @@ public class HLog implements Syncable {
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
+ status.setStatus("Performing split");
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
FileStatus[] files = fs.listStatus(srcDir);
@@ -1285,6 +1291,7 @@ public class HLog implements Syncable {
throw io;
}
lastSplitTime = System.currentTimeMillis() - startTime;
+ status.markComplete("Log splits complete.");
LOG.info("hlog file splitting completed in " + lastSplitTime +
" ms for " + srcDir.toString());
return splits;
@@ -1354,6 +1361,8 @@ public class HLog implements Syncable {
Collections.synchronizedMap(
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
List<Path> splits = null;
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Splitting logs in " + srcDir);
// Number of logs in a read batch
// More means faster but bigger mem consumption
@@ -1363,6 +1372,7 @@ public class HLog implements Syncable {
lastSplitSize = 0;
+ status.setStatus("Performing split");
try {
int i = -1;
while (i < logfiles.length) {
@@ -1388,6 +1398,7 @@ public class HLog implements Syncable {
LOG.warn("EOF from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
} catch (InterruptedIOException iioe) {
+ status.abort(StringUtils.stringifyException(iioe));
throw iioe;
} catch (IOException e) {
// If the IOE resulted from bad file format,
@@ -1401,6 +1412,7 @@ public class HLog implements Syncable {
". Marking as corrupted", e);
corruptedLogs.add(logPath);
} else {
+ status.abort(StringUtils.stringifyException(e));
throw e;
}
}
@@ -1409,6 +1421,7 @@ public class HLog implements Syncable {
writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
}
if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
+ status.abort("Discovered orphan hlog after split");
throw new IOException("Discovered orphan hlog after split. Maybe " +
"HRegionServer was not dead when we started");
}
@@ -1420,7 +1433,9 @@ public class HLog implements Syncable {
LOG.debug("Closed " + wap.p);
}
}
+ status.setStatus("Archiving logs after completed split");
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
+ status.markComplete("Split completed");
return splits;
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/InfoServer.java Tue Oct 11 17:42:14 2011
@@ -24,6 +24,7 @@ import org.apache.hadoop.http.HttpServer
import org.mortbay.jetty.handler.ContextHandlerCollection;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.webapp.WebAppContext;
import java.io.IOException;
import java.net.URL;
@@ -79,6 +80,12 @@ public class InfoServer extends HttpServ
logContext.addServlet(DefaultServlet.class, "/");
defaultContexts.put(logContext, true);
}
+ // Now bring up the task monitor
+ WebAppContext taskMonitorContext =
+ new WebAppContext(parent, "taskmontior", "/taskmonitor");
+ taskMonitorContext.addServlet(DefaultServlet.class, "/");
+ taskMonitorContext.setWar(appDir + "/taskmonitor");
+ defaultContexts.put(taskMonitorContext, true);
}
/**
Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/master/master.jsp Tue Oct 11 17:42:14 2011
@@ -37,7 +37,7 @@
<body>
<a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase"><img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" /></a>
<h1 id="page_title">Master: <%=master.getMasterAddress().getHostname()%>:<%=master.getMasterAddress().getPort()%></h1>
-<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a></p>
+<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a>, <a href="/taskmonitor">Task Monitor</a></p>
<!-- Various warnings that cluster admins should be aware of -->
<% if (JvmVersion.isBadJvmVersion()) { %>
Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/regionserver/regionserver.jsp Tue Oct 11 17:42:14 2011
@@ -33,7 +33,7 @@
<body>
<a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase"><img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" /></a>
<h1 id="page_title">Region Server: <%= serverInfo.getServerAddress().getHostname() %>:<%= serverInfo.getServerAddress().getPort() %></h1>
-<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a></p>
+<p id="links_menu"><a href="/logs/">Local logs</a>, <a href="/stacks">Thread Dump</a>, <a href="/logLevel">Log Level</a>, <a href="/taskmonitor">Task Monitor</a></p>
<hr id="head_rule" />
<h2>Region Server Attributes</h2>
Modified: hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css (original)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/static/hbase.css Tue Oct 11 17:42:14 2011
@@ -1,8 +1,8 @@
-h1, h2, h3 { color: DarkSlateBlue }
-table { border: thin solid DodgerBlue }
-tr { border: thin solid DodgerBlue }
-td { border: thin solid DodgerBlue }
-th { border: thin solid DodgerBlue }
+h1, h2, h3 { color: #483D8B; }
+table { border: thin solid #1e90ff; }
+tr { border: thin solid #1e90ff; }
+td { border: thin solid #1e90ff; }
+th { border: thin solid #1e90ff; }
#logo {float: left;}
#logo img {border: none;}
#page_title {padding-top: 27px;}
@@ -13,3 +13,16 @@ div.warning {
font-size: 110%;
font-weight: bold;
}
+
+tr.task-monitor-COMPLETE td {
+ background-color: #afa;
+}
+
+tr.task-monitor-IDLE td {
+ background-color: #ccc;
+ font-style: italic;
+}
+
+tr.task-monitor-ABORTED td {
+ background-color: #ffa;
+}
Added: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html (added)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/index.html Tue Oct 11 17:42:14 2011
@@ -0,0 +1 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=taskmonitor.jsp"/>
Added: hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp (added)
+++ hbase/branches/0.89/src/main/resources/hbase-webapps/taskmonitor/taskmonitor.jsp Tue Oct 11 17:42:14 2011
@@ -0,0 +1,50 @@
+<%@ page contentType="text/html;charset=UTF-8"
+ import="java.util.*"
+ import="org.codehaus.jettison.json.JSONArray"
+ import="org.codehaus.jettison.json.JSONException"
+ import="org.codehaus.jettison.json.JSONObject"
+ import="org.apache.hadoop.hbase.ipc.HBaseRPC"
+ import="org.apache.hadoop.hbase.monitoring.MonitoredTask"
+ import="org.apache.hadoop.hbase.monitoring.TaskMonitor" %><%
+ TaskMonitor taskMonitor = TaskMonitor.get();
+ long now = System.currentTimeMillis();
+ List<MonitoredTask> tasks = taskMonitor.getTasks();
+ Collections.reverse(tasks);
+%><?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+ "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+ <head><meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/>
+ <title>Task Monitor</title>
+<link rel="stylesheet" type="text/css" href="/static/hbase.css" />
+</head>
+<body>
+ <a id="logo" href="http://wiki.apache.org/lucene-hadoop/Hbase">
+ <img src="/static/hbase_logo_med.gif" alt="HBase Logo" title="HBase Logo" />
+ </a>
+ <h1 id="page_title">Task Monitor</h1>
+ <h2>Recent tasks</h2>
+ <% if(tasks.isEmpty()) { %>
+ <p>No tasks currently running on this node.</p>
+ <% } else { %>
+ <table>
+ <tr>
+ <th>Description</th>
+ <th>Status</th>
+ <th>Age</th>
+ </tr>
+ <% for(MonitoredTask task : tasks) { %>
+ <tr class="task-monitor-<%= task.getState() %>">
+ <td><%= task.getDescription() %></td>
+ <td><%= task.getStatus() %></td>
+ <td><%= now - task.getStartTime() %>ms
+ <% if(task.getCompletionTimestamp() != -1) { %>
+ (Completed <%= now - task.getCompletionTimestamp() %>ms ago)
+ <% } %>
+ </td>
+ </tr>
+ <% } %>
+ </table>
+ <% } %>
+</body>
+</html>
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java?rev=1181923&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java Tue Oct 11 17:42:14 2011
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+public class TestTaskMonitor {
+
+ @Test
+ public void testTaskMonitorBasics() {
+ TaskMonitor tm = new TaskMonitor();
+ assertTrue("Task monitor should start empty",
+ tm.getTasks().isEmpty());
+
+ // Make a task and fetch it back out
+ MonitoredTask task = tm.createStatus("Test task");
+ MonitoredTask taskFromTm = tm.getTasks().get(0);
+
+ // Make sure the state is reasonable.
+ assertEquals(task.getDescription(), taskFromTm.getDescription());
+ assertEquals(-1, taskFromTm.getCompletionTimestamp());
+ assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
+
+ // Mark it as finished
+ task.markComplete("Finished!");
+ assertEquals(MonitoredTask.State.COMPLETE, taskFromTm.getState());
+
+ // It should still show up in the TaskMonitor list
+ assertEquals(1, tm.getTasks().size());
+
+ // If we mark its completion time back a few minutes, it should get gced
+ ((MonitoredTaskImpl)taskFromTm).expireNow();
+ assertEquals(0, tm.getTasks().size());
+ }
+
+ @Test
+ public void testTasksGetAbortedOnLeak() throws InterruptedException {
+ final TaskMonitor tm = new TaskMonitor();
+ assertTrue("Task monitor should start empty",
+ tm.getTasks().isEmpty());
+
+ final AtomicBoolean threadSuccess = new AtomicBoolean(false);
+ // Make a task in some other thread and leak it
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ MonitoredTask task = tm.createStatus("Test task");
+ assertEquals(MonitoredTask.State.RUNNING, task.getState());
+ threadSuccess.set(true);
+ }
+ };
+ t.start();
+ t.join();
+ // Make sure the thread saw the correct state
+ assertTrue(threadSuccess.get());
+
+ // Make sure the leaked reference gets cleared
+ System.gc();
+ System.gc();
+ System.gc();
+
+ // Now it should be aborted
+ MonitoredTask taskFromTm = tm.getTasks().get(0);
+ assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
+ }
+
+ @Test
+ public void testTaskLimit() throws Exception {
+ TaskMonitor tm = new TaskMonitor();
+ for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
+ tm.createStatus("task " + i);
+ }
+ // Make sure it was limited correctly
+ assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
+ // Make sure we culled the earlier tasks, not later
+ // (i.e. tasks 0 through 9 should have been deleted)
+ assertEquals("task 10", tm.getTasks().get(0).getDescription());
+ }
+
+}
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Oct 11 17:42:14 2011
@@ -54,12 +54,14 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.Progressable;
import com.google.common.base.Joiner;
+import org.mockito.Mockito;
/**
* Test class fosr the Store
@@ -497,7 +499,7 @@ public class TestStore extends TestCase
private static void flushStore(Store store, long id) throws IOException {
StoreFlusher storeFlusher = store.getStoreFlusher(id);
storeFlusher.prepare();
- storeFlusher.flushCache();
+ storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
storeFlusher.commit();
}
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1181923&r1=1181922&r2=1181923&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Tue Oct 11 17:42:14 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Ge
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
@@ -52,6 +53,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* Test replay of edits out of a WAL split.
@@ -373,9 +375,11 @@ public class TestWALReplay {
try {
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
null) {
- protected boolean internalFlushcache(HLog wal, long myseqid)
+ protected boolean internalFlushcache(HLog wal, long myseqid,
+ MonitoredTask status)
throws IOException {
- boolean b = super.internalFlushcache(wal, myseqid);
+ boolean b = super.internalFlushcache(wal, myseqid,
+ Mockito.mock(MonitoredTask.class));
flushcount.incrementAndGet();
return b;
};
@@ -470,4 +474,4 @@ public class TestWALReplay {
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
return wal;
}
-}
\ No newline at end of file
+}