You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:16:50 UTC
svn commit: r1181527 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/regionserver/
Author: nspiegelberg
Date: Tue Oct 11 02:16:50 2011
New Revision: 1181527
URL: http://svn.apache.org/viewvc?rev=1181527&view=rev
Log:
First cut of store based compactions
Summary:
Changed all compaction requests to be per store instead of per region.
Note: this is not complete, wanted to get a diff out and continue working.
CompactSplitThread.java:run() is not completely done the way we want to - wanted
to discuss how we wanted to proceed here before doing it.
Test Plan:
Not tested yet.
Reviewed By: nspiegelberg
Reviewers: jgray, nspiegelberg, aaiyer, kannan
Commenters: kannan
CC: , kranganathan, nspiegelberg, kannan
Differential Revision: 233737
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Oct 11 02:16:50 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.StringUtils;
@@ -69,37 +71,45 @@ class CompactSplitThread extends Thread
@Override
public void run() {
while (!this.server.isStopRequested()) {
+ CompactionRequest compactionRequest = null;
HRegion r = null;
try {
- r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
- if (r != null) {
+ compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+ if (compactionRequest != null) {
lock.lock();
try {
if(!this.server.isStopRequested()) {
// Don't interrupt us while we are working
- byte [] midKey = r.compactStores();
+ r = compactionRequest.getHRegion();
+ byte [] midKey = r.compactStore(compactionRequest.getStore());
if (r.getLastCompactInfo() != null) { // compaction aborted?
this.server.getMetrics().addCompaction(r.getLastCompactInfo());
}
if (LOG.isDebugEnabled()) {
- HRegion next = this.compactionQueue.peek();
+ CompactionRequest next = this.compactionQueue.peek();
LOG.debug("Just finished a compaction. " +
" Current Compaction Queue: size=" +
getCompactionQueueSize() +
((next != null) ?
- ", topPri=" + next.getCompactPriority() : ""));
+ ", topPri=" + next.getPriority() : ""));
}
if (!this.server.isStopRequested()) {
// requests that were added during compaction will have a
// stale priority. remove and re-insert to update priority
- boolean hadCompaction = compactionQueue.remove(r);
+ boolean hadCompaction = compactionQueue.remove(compactionRequest);
if (midKey != null) {
split(r, midKey);
} else if (hadCompaction) {
- compactionQueue.add(r);
- } else if (r.getCompactPriority() < PRIORITY_USER) {
+ // recompute the priority for a request already in the queue
+ LOG.debug("Re-computing priority for compaction request " + compactionRequest);
+ compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
+ compactionQueue.add(compactionRequest);
+ } else if (compactionRequest.getStore().getCompactPriority() < PRIORITY_USER) {
// degenerate case. recursively enqueue blocked regions
- compactionQueue.add(r);
+ LOG.debug("Re-queueing with " + compactionRequest.getStore().getCompactPriority() +
+ " priority for compaction request " + compactionRequest);
+ compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
+ compactionQueue.add(compactionRequest);
}
}
}
@@ -135,7 +145,9 @@ class CompactSplitThread extends Thread
*/
public synchronized void requestCompaction(final HRegion r,
final String why) {
- requestCompaction(r, false, why, r.getCompactPriority());
+ for(Store s : r.getStores().values()) {
+ requestCompaction(r, s, false, why, s.getCompactPriority());
+ }
}
public synchronized void requestCompaction(final HRegion r,
@@ -143,12 +155,19 @@ class CompactSplitThread extends Thread
requestCompaction(r, false, why, p);
}
+ public synchronized void requestCompaction(final HRegion r,
+ final boolean force, final String why, int p) {
+ for(Store s : r.getStores().values()) {
+ requestCompaction(r, s, force, why, p);
+ }
+ }
+
/**
* @param r HRegion store belongs to
* @param force Whether next compaction should be major
* @param why Why compaction requested -- used in debug messages
*/
- public synchronized void requestCompaction(final HRegion r,
+ public synchronized void requestCompaction(final HRegion r, final Store s,
final boolean force, final String why, int priority) {
boolean addedToQueue = false;
@@ -159,16 +178,16 @@ class CompactSplitThread extends Thread
// tell the region to major-compact (and don't downgrade it)
if (force) {
- r.setForceMajorCompaction(force);
+ s.setForceMajorCompaction(force);
}
-
- addedToQueue = compactionQueue.add(r, priority);
-
+ CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
+ addedToQueue = compactionQueue.add(compactionRequest);
// only log if actually added to compaction queue...
if (addedToQueue && LOG.isDebugEnabled()) {
LOG.debug("Compaction " + (force? "(major) ": "") +
"requested for region " + r.getRegionNameAsString() +
"/" + r.getRegionInfo().getEncodedName() +
+ ", store " + s +
(why != null && !why.isEmpty()? " because: " + why: "") +
"; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size());
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:16:50 2011
@@ -890,6 +890,20 @@ public class HRegion implements HeapSize
return compactStores();
}
+ /**
+ * Compact all the stores and return the split key of the first store that needs
+ * to be split.
+ */
+ public byte[] compactStores() throws IOException {
+ byte[] splitRow = null;
+ for(Store s : getStores().values()) {
+ if(splitRow == null) {
+ splitRow = compactStore(s);
+ }
+ }
+ return splitRow;
+ }
+
/*
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
@@ -905,7 +919,7 @@ public class HRegion implements HeapSize
* @return split row if split is needed
* @throws IOException e
*/
- public byte [] compactStores()
+ public byte [] compactStore(Store store)
throws IOException {
if (this.closing.get() || this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closing/closed");
@@ -933,16 +947,12 @@ public class HRegion implements HeapSize
long startTime = EnvironmentEdgeManager.currentTimeMillis();
doRegionCompactionPrep();
long lastCompactSize = 0;
- long maxSize = -1;
boolean completed = false;
try {
- for (Store store: stores.values()) {
- final Store.StoreSize ss = store.compact();
- lastCompactSize += store.getLastCompactSize();
- if (ss != null && ss.getSize() > maxSize) {
- maxSize = ss.getSize();
- splitRow = ss.getSplitRow();
- }
+ final Store.StoreSize ss = store.compact();
+ lastCompactSize += store.getLastCompactSize();
+ if (ss != null) {
+ splitRow = ss.getSplitRow();
}
completed = true;
} catch (InterruptedIOException iioe) {
@@ -2230,6 +2240,10 @@ public class HRegion implements HeapSize
return this.stores.get(column);
}
+ public Map<byte[], Store> getStores() {
+ return this.stores;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// Support code
//////////////////////////////////////////////////////////////////////////////
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java Tue Oct 11 02:16:50 2011
@@ -20,132 +20,62 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
- * This class delegates to the BlockingQueue but wraps all HRegions in
+ * This class delegates to the BlockingQueue but wraps all Stores in
* compaction requests that hold the priority and the date requested.
*
* Implementation Note: With an elevation time of -1 there is the potential for
* starvation of the lower priority compaction requests as long as there is a
* constant stream of high priority requests.
*/
-public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
+public class PriorityCompactionQueue implements BlockingQueue<CompactionRequest> {
static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
- /**
- * This class represents a compaction request and holds the region, priority,
- * and time submitted.
- */
- private class CompactionRequest implements Comparable<CompactionRequest> {
- private final HRegion r;
- private final int p;
- private final Date date;
-
- public CompactionRequest(HRegion r, int p) {
- this(r, p, null);
- }
-
- public CompactionRequest(HRegion r, int p, Date d) {
- if (r == null) {
- throw new NullPointerException("HRegion cannot be null");
- }
-
- if (d == null) {
- d = new Date();
- }
-
- this.r = r;
- this.p = p;
- this.date = d;
- }
-
- /**
- * This function will define where in the priority queue the request will
- * end up. Those with the highest priorities will be first. When the
- * priorities are the same it will It will first compare priority then date
- * to maintain a FIFO functionality.
- *
- * <p>Note: The date is only accurate to the millisecond which means it is
- * possible that two requests were inserted into the queue within a
- * millisecond. When that is the case this function will break the tie
- * arbitrarily.
- */
- @Override
- public int compareTo(CompactionRequest request) {
- //NOTE: The head of the priority queue is the least element
- if (this.equals(request)) {
- return 0; //they are the same request
- }
- int compareVal;
-
- compareVal = p - request.p; //compare priority
- if (compareVal != 0) {
- return compareVal;
- }
-
- compareVal = date.compareTo(request.date);
- if (compareVal != 0) {
- return compareVal;
- }
-
- //break the tie arbitrarily
- return -1;
- }
-
- /** Gets the HRegion for the request */
- HRegion getHRegion() {
- return r;
- }
-
- /** Gets the priority for the request */
- int getPriority() {
- return p;
- }
-
- public String toString() {
- return "regionName=" + r.getRegionNameAsString() +
- ", priority=" + p + ", date=" + date;
- }
- }
-
/** The actual blocking queue we delegate to */
protected final BlockingQueue<CompactionRequest> queue =
new PriorityBlockingQueue<CompactionRequest>();
- /** Hash map of the HRegions contained within the Compaction Queue */
- private final HashMap<HRegion, CompactionRequest> regionsInQueue =
- new HashMap<HRegion, CompactionRequest>();
+ /** Hash map of the Stores contained within the Compaction Queue */
+ private final HashMap<Store, CompactionRequest> storesInQueue =
+ new HashMap<Store, CompactionRequest>();
/** Creates a new PriorityCompactionQueue with no priority elevation time */
public PriorityCompactionQueue() {
LOG.debug("Create PriorityCompactionQueue");
}
- /** If the region is not already in the queue it will add it and return a
+ /** If the store is not already in the queue it will add it and return a
* new compaction request object. If it is already present in the queue
* then it will return null.
* @param p If null it will use the default priority
* @return returns a compaction request if it isn't already in the queue
*/
- protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
+ protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) {
CompactionRequest queuedRequest = null;
- CompactionRequest newRequest = new CompactionRequest(r, p);
- synchronized (regionsInQueue) {
- queuedRequest = regionsInQueue.get(r);
+ synchronized (storesInQueue) {
+ queuedRequest = storesInQueue.get(newRequest.getStore());
if (queuedRequest == null ||
newRequest.getPriority() < queuedRequest.getPriority()) {
- LOG.trace("Inserting region in queue. " + newRequest);
- regionsInQueue.put(r, newRequest);
+ String reason = "";
+ if (newRequest.getPriority() < queuedRequest.getPriority()) {
+ reason = "Reason : priority changed from " +
+ queuedRequest.getPriority() + " to " +
+ newRequest.getPriority() + ". ";
+ }
+ LOG.debug("Inserting store in queue. " + reason + newRequest);
+ storesInQueue.put(newRequest.getStore(), newRequest);
} else {
- LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
+ LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest +
", requested: " + newRequest);
newRequest = null; // It is already present so don't add it
}
@@ -159,23 +89,25 @@ public class PriorityCompactionQueue imp
return newRequest;
}
- /** Removes the request from the regions in queue
+ /** Removes the request from the stores in queue
* @param p If null it will use the default priority
*/
- protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
- if (r == null) return null;
+ protected CompactionRequest removeFromQueue(CompactionRequest c) {
+ if (c == null) return null;
- synchronized (regionsInQueue) {
- CompactionRequest cr = regionsInQueue.remove(r);
+ synchronized (storesInQueue) {
+ CompactionRequest cr = storesInQueue.remove(c.getStore());
if (cr == null) {
- LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
+ LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " +
+ "region = " + c.getHRegion() + ", store = " + c.getStore());
}
return cr;
}
}
- public boolean add(HRegion e, int p) {
- CompactionRequest request = this.addToRegionsInQueue(e, p);
+ @Override
+ public boolean add(CompactionRequest e) {
+ CompactionRequest request = this.addToCompactionQueue(e);
if (request != null) {
boolean result = queue.add(request);
queue.peek();
@@ -186,68 +118,50 @@ public class PriorityCompactionQueue imp
}
@Override
- public boolean add(HRegion e) {
- return add(e, e.getCompactPriority());
- }
-
- public boolean offer(HRegion e, int p) {
- CompactionRequest request = this.addToRegionsInQueue(e, p);
+ public boolean offer(CompactionRequest e) {
+ CompactionRequest request = this.addToCompactionQueue(e);
return (request != null)? queue.offer(request): false;
}
@Override
- public boolean offer(HRegion e) {
- return offer(e, e.getCompactPriority());
- }
-
- public void put(HRegion e, int p) throws InterruptedException {
- CompactionRequest request = this.addToRegionsInQueue(e, p);
+ public void put(CompactionRequest e) throws InterruptedException {
+ CompactionRequest request = this.addToCompactionQueue(e);
if (request != null) {
queue.put(request);
}
}
@Override
- public void put(HRegion e) throws InterruptedException {
- put(e, e.getCompactPriority());
- }
-
- public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
+ public boolean offer(CompactionRequest e, long timeout, TimeUnit unit)
throws InterruptedException {
- CompactionRequest request = this.addToRegionsInQueue(e, p);
+ CompactionRequest request = this.addToCompactionQueue(e);
return (request != null)? queue.offer(request, timeout, unit): false;
}
@Override
- public boolean offer(HRegion e, long timeout, TimeUnit unit)
- throws InterruptedException {
- return offer(e, e.getCompactPriority(), timeout, unit);
- }
-
- @Override
- public HRegion take() throws InterruptedException {
+ public CompactionRequest take() throws InterruptedException {
CompactionRequest cr = queue.take();
if (cr != null) {
- removeFromRegionsInQueue(cr.getHRegion());
- return cr.getHRegion();
+ removeFromQueue(cr);
+ return cr;
}
return null;
}
@Override
- public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
+ public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException {
CompactionRequest cr = queue.poll(timeout, unit);
if (cr != null) {
- removeFromRegionsInQueue(cr.getHRegion());
- return cr.getHRegion();
+ removeFromQueue(cr);
+ return cr;
}
return null;
}
@Override
- public boolean remove(Object r) {
- if (r instanceof HRegion) {
- CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
+ public boolean remove(Object o) {
+ if (o instanceof CompactionRequest) {
+ CompactionRequest cr = removeFromQueue((CompactionRequest) o);
if (cr != null) {
return queue.remove(cr);
}
@@ -257,21 +171,21 @@ public class PriorityCompactionQueue imp
}
@Override
- public HRegion remove() {
+ public CompactionRequest remove() {
CompactionRequest cr = queue.remove();
if (cr != null) {
- removeFromRegionsInQueue(cr.getHRegion());
- return cr.getHRegion();
+ removeFromQueue(cr);
+ return cr;
}
return null;
}
@Override
- public HRegion poll() {
+ public CompactionRequest poll() {
CompactionRequest cr = queue.poll();
if (cr != null) {
- removeFromRegionsInQueue(cr.getHRegion());
- return cr.getHRegion();
+ removeFromQueue(cr);
+ return cr;
}
return null;
}
@@ -283,9 +197,9 @@ public class PriorityCompactionQueue imp
@Override
public boolean contains(Object r) {
- if (r instanceof HRegion) {
- synchronized (regionsInQueue) {
- return regionsInQueue.containsKey((HRegion) r);
+ if (r instanceof CompactionRequest) {
+ synchronized (storesInQueue) {
+ return storesInQueue.containsKey((CompactionRequest) r);
}
} else if (r instanceof CompactionRequest) {
return queue.contains(r);
@@ -294,15 +208,15 @@ public class PriorityCompactionQueue imp
}
@Override
- public HRegion element() {
+ public CompactionRequest element() {
CompactionRequest cr = queue.element();
- return (cr != null)? cr.getHRegion(): null;
+ return (cr != null)? cr: null;
}
@Override
- public HRegion peek() {
+ public CompactionRequest peek() {
CompactionRequest cr = queue.peek();
- return (cr != null)? cr.getHRegion(): null;
+ return (cr != null)? cr: null;
}
@Override
@@ -317,14 +231,14 @@ public class PriorityCompactionQueue imp
@Override
public void clear() {
- regionsInQueue.clear();
+ storesInQueue.clear();
queue.clear();
}
// Unimplemented methods, collection methods
@Override
- public Iterator<HRegion> iterator() {
+ public Iterator<CompactionRequest> iterator() {
throw new UnsupportedOperationException("Not supported.");
}
@@ -344,7 +258,7 @@ public class PriorityCompactionQueue imp
}
@Override
- public boolean addAll(Collection<? extends HRegion> c) {
+ public boolean addAll(Collection<? extends CompactionRequest> c) {
throw new UnsupportedOperationException("Not supported.");
}
@@ -359,12 +273,12 @@ public class PriorityCompactionQueue imp
}
@Override
- public int drainTo(Collection<? super HRegion> c) {
+ public int drainTo(Collection<? super CompactionRequest> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
- public int drainTo(Collection<? super HRegion> c, int maxElements) {
+ public int drainTo(Collection<? super CompactionRequest> c, int maxElements) {
throw new UnsupportedOperationException("Not supported.");
}
}
\ No newline at end of file
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:16:50 2011
@@ -1456,7 +1456,7 @@ public class Store implements HeapSize {
/**
* @return The priority that this store should have in the compaction queue
*/
- int getCompactPriority() {
+ public int getCompactPriority() {
return this.blockingStoreFileCount - this.storefiles.size();
}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1181527&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Oct 11 02:16:50 2011
@@ -0,0 +1,102 @@
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+ /**
+ * This class represents a compaction request and holds the region, priority,
+ * and time submitted.
+ */
+ public class CompactionRequest implements Comparable<CompactionRequest> {
+ static final Log LOG = LogFactory.getLog(CompactionRequest.class);
+ private final HRegion r;
+ private final Store s;
+ private int p;
+ private final Date date;
+
+ public CompactionRequest(HRegion r, Store s) {
+ this(r, s, s.getCompactPriority());
+ }
+
+ public CompactionRequest(HRegion r, Store s, int p) {
+ this(r, s, p, null);
+ }
+
+ public CompactionRequest(HRegion r, Store s, int p, Date d) {
+ if (r == null) {
+ throw new NullPointerException("HRegion cannot be null");
+ }
+
+ if (d == null) {
+ d = new Date();
+ }
+
+ this.r = r;
+ this.s = s;
+ this.p = p;
+ this.date = d;
+ }
+
+ /**
+ * This function will define where in the priority queue the request will
+ * end up. Those with the highest priorities will be first. When the
+ * priorities are the same it will It will first compare priority then date
+ * to maintain a FIFO functionality.
+ *
+ * <p>Note: The date is only accurate to the millisecond which means it is
+ * possible that two requests were inserted into the queue within a
+ * millisecond. When that is the case this function will break the tie
+ * arbitrarily.
+ */
+ @Override
+ public int compareTo(CompactionRequest request) {
+ //NOTE: The head of the priority queue is the least element
+ if (this.equals(request)) {
+ return 0; //they are the same request
+ }
+ int compareVal;
+
+ compareVal = p - request.p; //compare priority
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ compareVal = date.compareTo(request.date);
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ //break the tie arbitrarily
+ return -1;
+ }
+
+ /** Gets the HRegion for the request */
+ public HRegion getHRegion() {
+ return r;
+ }
+
+ /** Gets the Store for the request */
+ public Store getStore() {
+ return s;
+ }
+
+ /** Gets the priority for the request */
+ public int getPriority() {
+ return p;
+ }
+
+ /** Gets the priority for the request */
+ public void setPriority(int p) {
+ this.p = p;
+ }
+
+ public String toString() {
+ return "regionName=" + r.getRegionNameAsString() +
+ "storeName = " + new String(s.getFamily().getName()) +
+ ", priority=" + p + ", date=" + date;
+ }
+ }
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java Tue Oct 11 02:16:50 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -68,7 +69,7 @@ public class TestPriorityCompactionQueue
protected void getAndCheckRegion(PriorityCompactionQueue pq,
HRegion checkRegion) {
- HRegion r = pq.remove();
+ HRegion r = pq.remove().getHRegion();
if (r != checkRegion) {
Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
.equals(checkRegion));
@@ -76,7 +77,7 @@ public class TestPriorityCompactionQueue
}
protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
- pq.add(r, p);
+ pq.add(new CompactionRequest(r, null, p));
try {
// Sleep 10 millisecond so 2 things are not put in the queue within the
// same millisecond. The queue breaks ties arbitrarily between two