You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/07/09 19:03:23 UTC
[2/3] hbase git commit: HBASE-20806 Split style journal for flushes
and compactions
HBASE-20806 Split style journal for flushes and compactions
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9faebd26
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9faebd26
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9faebd26
Branch: refs/heads/branch-1.2
Commit: 9faebd26861080b05b3e0448b152721c35c4ddcc
Parents: a5c69c8
Author: Abhishek Singh Chouhan <ac...@apache.org>
Authored: Fri Jul 6 12:02:18 2018 +0530
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Jul 9 10:55:51 2018 -0700
----------------------------------------------------------------------
.../hadoop/hbase/monitoring/MonitoredTask.java | 20 +++++
.../hbase/monitoring/MonitoredTaskImpl.java | 82 ++++++++++++++++++++
.../hadoop/hbase/regionserver/HRegion.java | 8 +-
.../hbase/monitoring/TestTaskMonitor.java | 23 ++++++
4 files changed, 132 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9faebd26/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
index ff3667b..34fd8ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.monitoring;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -32,6 +33,12 @@ public interface MonitoredTask extends Cloneable {
ABORTED;
}
+ public interface StatusJournalEntry {
+ String getStatus();
+
+ long getTimeStamp();
+ }
+
long getStartTime();
String getDescription();
String getStatus();
@@ -49,6 +56,19 @@ public interface MonitoredTask extends Cloneable {
void setStatus(String status);
void setDescription(String description);
+ List<StatusJournalEntry> getStatusJournal();
+
+ /**
+ * Enable journal that will store all statuses that have been set along with the time stamps when
+ * they were set.
+ * @param includeCurrentStatus whether to include the current set status in the journal
+ */
+ void enableStatusJournal(boolean includeCurrentStatus);
+
+ void disableStatusJournal();
+
+ String prettyPrintJournal();
+
/**
* Explicitly mark this status as able to be cleaned up,
* even though it might not be complete.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9faebd26/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
index 27aaceb..5270e7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
@@ -19,10 +19,14 @@
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@InterfaceAudience.Private
@@ -36,6 +40,9 @@ class MonitoredTaskImpl implements MonitoredTask {
protected volatile State state = State.RUNNING;
+ private boolean journalEnabled = false;
+ private List<StatusJournalEntry> journal;
+
private static final ObjectMapper MAPPER = new ObjectMapper();
public MonitoredTaskImpl() {
@@ -44,6 +51,35 @@ class MonitoredTaskImpl implements MonitoredTask {
stateTime = startTime;
}
+ private static class StatusJournalEntryImpl implements StatusJournalEntry {
+ private long statusTime;
+ private String status;
+
+ public StatusJournalEntryImpl(String status, long statusTime) {
+ this.status = status;
+ this.statusTime = statusTime;
+ }
+
+ @Override
+ public String getStatus() {
+ return status;
+ }
+
+ @Override
+ public long getTimeStamp() {
+ return statusTime;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(status);
+ sb.append(" at ");
+ sb.append(statusTime);
+ return sb.toString();
+ }
+ }
+
@Override
public synchronized MonitoredTaskImpl clone() {
try {
@@ -119,6 +155,9 @@ class MonitoredTaskImpl implements MonitoredTask {
public void setStatus(String status) {
this.status = status;
statusTime = System.currentTimeMillis();
+ if (journalEnabled) {
+ journal.add(new StatusJournalEntryImpl(this.status, statusTime));
+ }
}
protected void setState(State state) {
@@ -178,4 +217,47 @@ class MonitoredTaskImpl implements MonitoredTask {
return sb.toString();
}
+ /**
+ * Returns the status journal. This implementation of status journal is not thread-safe. Currently
+ * we use this to track various stages of flushes and compactions where we can use this/pretty
+ * print for post task analysis, by which time we are already done changing states (writing to
+ * journal)
+ */
+ @Override
+ public List<StatusJournalEntry> getStatusJournal() {
+ if (journal == null) {
+ return Collections.emptyList();
+ } else {
+ return Collections.unmodifiableList(journal);
+ }
+ }
+
+ /**
+ * Enables journaling of this monitored task, the first invocation will lazily initialize the
+ * journal. The journal implementation itself and this method are not thread safe
+ */
+ @Override
+ public void enableStatusJournal(boolean includeCurrentStatus) {
+ if (journalEnabled && journal != null) {
+ return;
+ }
+ journalEnabled = true;
+ if (journal == null) {
+ journal = new ArrayList<StatusJournalEntry>();
+ }
+ if (includeCurrentStatus) {
+ journal.add(new StatusJournalEntryImpl(status, statusTime));
+ }
+ }
+
+ @Override
+ public void disableStatusJournal() {
+ journalEnabled = false;
+ }
+
+ @Override
+ public String prettyPrintJournal() {
+ return StringUtils.join("\n\t", getStatusJournal());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9faebd26/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a19b3e8..0ead229 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1865,6 +1865,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
+ status.enableStatusJournal(false);
if (this.closed.get()) {
String msg = "Skipping compaction on " + this + " because closed";
LOG.debug(msg);
@@ -1914,7 +1915,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
try {
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
- if (status != null) status.cleanup();
+ if (status != null) {
+ LOG.debug("Compaction status journal:\n\t" + status.prettyPrintJournal());
+ status.cleanup();
+ }
} finally {
lock.readLock().unlock();
}
@@ -1957,6 +1961,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
+ status.enableStatusJournal(false);
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
lock.readLock().lock();
@@ -2016,6 +2021,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
} finally {
lock.readLock().unlock();
+ LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal());
status.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9faebd26/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index ff9bd57..8d6cdd7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -100,6 +100,29 @@ public class TestTaskMonitor {
assertEquals("task 10", tm.getTasks().get(0).getDescription());
}
+ @Test
+ public void testStatusJournal() {
+ TaskMonitor tm = new TaskMonitor();
+ MonitoredTask task = tm.createStatus("Test task");
+ assertTrue(task.getStatusJournal().isEmpty());
+ task.disableStatusJournal();
+ task.setStatus("status1");
+ // journal should be empty since it is disabled
+ assertTrue(task.getStatusJournal().isEmpty());
+ task.enableStatusJournal(true);
+ // check existing status entered in journal
+ assertEquals("status1", task.getStatusJournal().get(0).getStatus());
+ assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
+ task.disableStatusJournal();
+ task.setStatus("status2");
+ // check status 2 not added since disabled
+ assertEquals(1, task.getStatusJournal().size());
+ task.enableStatusJournal(false);
+ // size should still be 1 since we didn't include current status
+ assertEquals(1, task.getStatusJournal().size());
+ task.setStatus("status3");
+ assertEquals("status3", task.getStatusJournal().get(1).getStatus());
+ }
}