You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/01/09 02:55:35 UTC
svn commit: r732909 [2/2] - in /hadoop/hbase/branches/0.19_on_hadoop_0.18:
./ bin/ src/java/org/apache/hadoop/hbase/
src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apache/hadoop/hbase/master/ src/java/org/ap...
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Jan 8 17:55:34 2009
@@ -137,6 +137,12 @@
final HRegionInfo regionInfo;
final Path regiondir;
private final Path regionCompactionDir;
+
+ /*
+ * Set this when scheduling compaction if want the next compaction to be a
+ * major compaction. Cleared each time through compaction code.
+ */
+ private volatile boolean forceMajorCompaction = false;
/*
* Data structure of write state flags used coordinating flushes,
@@ -648,6 +654,14 @@
}
}
+ void setForceMajorCompaction(final boolean b) {
+ this.forceMajorCompaction = b;
+ }
+
+ boolean getForceMajorCompaction() {
+ return this.forceMajorCompaction;
+ }
+
/**
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
@@ -663,7 +677,9 @@
* @throws IOException
*/
public byte [] compactStores() throws IOException {
- return compactStores(false);
+ boolean majorCompaction = this.forceMajorCompaction;
+ this.forceMajorCompaction = false;
+ return compactStores(majorCompaction);
}
/*
@@ -1059,7 +1075,7 @@
* @throws IOException
*/
public RowResult getClosestRowBefore(final byte [] row,
- final byte [] columnFamily)
+ final byte [] columnFamily)
throws IOException{
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
@@ -1068,20 +1084,22 @@
splitsAndClosesLock.readLock().lock();
try {
HStore store = getStore(columnFamily);
- // get the closest key
+ // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
byte [] closestKey = store.getRowKeyAtOrBefore(row);
- // If it happens to be an exact match, we can stop looping.
+ // If it happens to be an exact match, we can stop.
// Otherwise, we need to check if it's the max and move to the next
- if (HStoreKey.equalsTwoRowKeys(regionInfo, row, closestKey)) {
- key = new HStoreKey(closestKey, this.regionInfo);
- } else if (closestKey != null &&
- (key == null || HStoreKey.compareTwoRowKeys(
- regionInfo,closestKey, key.getRow()) > 0) ) {
- key = new HStoreKey(closestKey, this.regionInfo);
- } else {
+ if (closestKey != null) {
+ if (HStoreKey.equalsTwoRowKeys(regionInfo, row, closestKey)) {
+ key = new HStoreKey(closestKey, this.regionInfo);
+ }
+ if (key == null) {
+ key = new HStoreKey(closestKey, this.regionInfo);
+ }
+ }
+ if (key == null) {
return null;
}
-
+
// Now that we've found our key, get the values
HbaseMapWritable<byte [], Cell> cells =
new HbaseMapWritable<byte [], Cell>();
@@ -1275,6 +1293,99 @@
}
}
+
+ /**
+ * Performs an atomic check and save operation. Checks if
+ * the specified expected values have changed, and if not
+ * applies the update.
+ *
+ * @param b the update to apply
+ * @param expectedValues the expected values to check
+ * @param lockid
+ * @param writeToWAL whether or not to write to the write ahead log
+ * @return true if update was applied
+ * @throws IOException
+ */
+ public boolean checkAndSave(BatchUpdate b,
+ HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
+ boolean writeToWAL)
+ throws IOException {
+ // This is basically a copy of batchUpdate with the atomic check and save
+ // added in. So you should read this method with batchUpdate. I will
+ // comment the areas that I have changed where I have not changed, you
+ // should read the comments from the batchUpdate method
+ boolean success = true;
+ checkReadOnly();
+ checkResources();
+ splitsAndClosesLock.readLock().lock();
+ try {
+ byte[] row = b.getRow();
+ Integer lid = getLock(lockid,row);
+ try {
+ Set<byte[]> keySet = expectedValues.keySet();
+ Map<byte[],Cell> actualValues = this.getFull(row,keySet,
+ HConstants.LATEST_TIMESTAMP, 1,lid);
+ for (byte[] key : keySet) {
+ // If test fails exit
+ if(!Bytes.equals(actualValues.get(key).getValue(),
+ expectedValues.get(key))) {
+ success = false;
+ break;
+ }
+ }
+
+ if (success) {
+ long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
+ System.currentTimeMillis(): b.getTimestamp();
+ List<byte []> deletes = null;
+ for (BatchOperation op: b) {
+ HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime,
+ this.regionInfo);
+ byte[] val = null;
+ if (op.isPut()) {
+ val = op.getValue();
+ if (HLogEdit.isDeleted(val)) {
+ throw new IOException("Cannot insert value: " + val);
+ }
+ } else {
+ if (b.getTimestamp() == LATEST_TIMESTAMP) {
+ // Save off these deletes
+ if (deletes == null) {
+ deletes = new ArrayList<byte []>();
+ }
+ deletes.add(op.getColumn());
+ } else {
+ val = HLogEdit.deleteBytes.get();
+ }
+ }
+ if (val != null) {
+ localput(lid, key, val);
+ }
+ }
+ TreeMap<HStoreKey, byte[]> edits =
+ this.targetColumns.remove(lid);
+ if (edits != null && edits.size() > 0) {
+ update(edits, writeToWAL);
+ }
+ if (deletes != null && deletes.size() > 0) {
+ // We have some LATEST_TIMESTAMP deletes to run.
+ for (byte [] column: deletes) {
+ deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+ }
+ }
+ }
+ } catch (IOException e) {
+ this.targetColumns.remove(Long.valueOf(lid));
+ throw e;
+ } finally {
+ if(lockid == null) releaseRowLock(lid);
+ }
+ } finally {
+ splitsAndClosesLock.readLock().unlock();
+ }
+ return success;
+ }
+
/*
* Check if resources to support an update.
*
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Jan 8 17:55:34 2009
@@ -77,6 +77,7 @@
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.ValueOverMaxLengthException;
+import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
@@ -534,6 +535,7 @@
/**
* Run and wait on passed thread in HRS context.
* @param t
+ * @param dfsShutdownWait
*/
public void runThread(final Thread t, final long dfsShutdownWait) {
if (t == null) {
@@ -727,7 +729,7 @@
* Thread for toggling safemode after some configurable interval.
*/
private class SafeModeThread extends Thread {
-
+ @Override
public void run() {
// first, wait the required interval before turning off safemode
int safemodeInterval =
@@ -761,7 +763,9 @@
-1
};
for (int i = 0; i < limitSteps.length; i++) {
- if (LOG.isDebugEnabled()) {
+ // Just log changes.
+ if (compactSplitThread.getLimit() != limitSteps[i] &&
+ LOG.isDebugEnabled()) {
LOG.debug("setting compaction limit to " + limitSteps[i]);
}
compactSplitThread.setLimit(limitSteps[i]);
@@ -1224,6 +1228,7 @@
continue;
}
LOG.info("Worker: " + e.msg);
+ HRegion region = null;
HRegionInfo info = e.msg.getRegionInfo();
switch(e.msg.getType()) {
@@ -1257,28 +1262,29 @@
closeRegion(e.msg.getRegionInfo(), false);
break;
- case MSG_REGION_SPLIT: {
- // Force split a region
- HRegion region = getRegion(info.getRegionName());
- // flush the memcache for the region
+ case MSG_REGION_SPLIT:
+ region = getRegion(info.getRegionName());
region.flushcache();
- // flag that the region should be split
region.regionInfo.shouldSplit(true);
- // force a compaction
+ // force a compaction; split will be side-effect.
compactSplitThread.compactionRequested(region,
- "MSG_REGION_SPLIT");
- } break;
+ e.msg.getType().name());
+ break;
- case MSG_REGION_COMPACT: {
+ case MSG_REGION_MAJOR_COMPACT:
+ case MSG_REGION_COMPACT:
// Compact a region
- HRegion region = getRegion(info.getRegionName());
- // flush the memcache for the region
- region.flushcache();
- // force a compaction
+ region = getRegion(info.getRegionName());
compactSplitThread.compactionRequested(region,
- "MSG_REGION_COMPACT");
- } break;
-
+ e.msg.isType(Type.MSG_REGION_MAJOR_COMPACT),
+ e.msg.getType().name());
+ break;
+
+ case MSG_REGION_FLUSH:
+ region = getRegion(info.getRegionName());
+ region.flushcache();
+ break;
+
default:
throw new AssertionError(
"Impossible state during msg processing. Instruction: "
@@ -1622,6 +1628,26 @@
return -1;
}
+ public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
+ final HbaseMapWritable<byte[],byte[]> expectedValues)
+ throws IOException {
+ if (b.getRow() == null)
+ throw new IllegalArgumentException("update has null row");
+ checkOpen();
+ this.requestCount.incrementAndGet();
+ HRegion region = getRegion(regionName);
+ validateValuesLength(b, region);
+ try {
+ cacheFlusher.reclaimMemcacheMemory();
+ boolean result = region.checkAndSave(b,
+ expectedValues,getLockFromId(b.getRowLock()), false);
+ return result;
+ } catch (Throwable t) {
+ throw convertThrowableToIOE(cleanup(t));
+ }
+ }
+
+
/**
* Utility method to verify values length
* @param batchUpdate The update to verify
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Jan 8 17:55:34 2009
@@ -786,7 +786,7 @@
if (stats == null || stats.length == 0) {
return 0l;
}
- long lowTimestamp = Long.MAX_VALUE;
+ long lowTimestamp = Long.MAX_VALUE;
for (int i = 0; i < stats.length; i++) {
long timestamp = stats[i].getModificationTime();
if (timestamp < lowTimestamp){
@@ -966,7 +966,12 @@
* @return True if we should run a major compaction.
*/
boolean isMajorCompaction() throws IOException {
- return isMajorCompaction(null);
+ List<HStoreFile> filesToCompact = null;
+ synchronized (storefiles) {
+ // filesToCompact are sorted oldest to newest.
+ filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
+ }
+ return isMajorCompaction(filesToCompact);
}
/*
@@ -977,25 +982,24 @@
throws IOException {
boolean result = false;
Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
- this.family.getName());
+ this.family.getName());
long lowTimestamp = getLowestTimestamp(fs, mapdir);
- if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
- lowTimestamp > 0l) {
+ long now = System.currentTimeMillis();
+ if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
// Major compaction time has elapsed.
- long elapsedTime = System.currentTimeMillis() - lowTimestamp;
+ long elapsedTime = now - lowTimestamp;
if (filesToCompact != null && filesToCompact.size() == 1 &&
filesToCompact.get(0).isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction because only one (major) " +
- "compacted file only and elapsedTime " + elapsedTime +
- " is < ttl=" + this.ttl);
+ LOG.debug("Skipping major compaction of " + this.storeNameStr +
+ " because one (major) compacted file only and elapsedTime " +
+ elapsedTime + "ms is < ttl=" + this.ttl);
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store: " +
- this.storeNameStr + ". Time since last major compaction: " +
- ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+ LOG.debug("Major compaction triggered on store " + this.storeNameStr +
+ "; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Thu Jan 8 17:55:34 2009
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -59,6 +60,9 @@
// Used around transition from no storefile to the first.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ // Used to indicate that the scanner has closed (see HBASE-1107)
+ private final AtomicBoolean closing = new AtomicBoolean(false);
/** Create an Scanner with a handle on the memcache and HStore files. */
@SuppressWarnings("unchecked")
@@ -294,6 +298,7 @@
}
public void close() {
+ this.closing.set(true);
this.store.deleteChangedReaderObserver(this);
doClose();
}
@@ -309,6 +314,9 @@
// Implementation of ChangedReadersObserver
public void updateReaders() throws IOException {
+ if (this.closing.get()) {
+ return;
+ }
this.lock.writeLock().lock();
try {
MapFile.Reader [] readers = this.store.getReaders();
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java Thu Jan 8 17:55:34 2009
@@ -75,7 +75,7 @@
assertTrue(m.size() == 1);
// tell the master to split the table
- admin.modifyTable(tableName, HConstants.MODIFY_TABLE_SPLIT);
+ admin.split(Bytes.toString(tableName));
// give some time for the split to happen
Thread.sleep(15 * 1000);
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestHTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestHTable.java?rev=732909&r1=732908&r2=732909&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestHTable.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/TestHTable.java Thu Jan 8 17:55:34 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -48,6 +49,67 @@
private static final byte [] attrName = Bytes.toBytes("TESTATTR");
private static final byte [] attrValue = Bytes.toBytes("somevalue");
+ public void testCheckAndSave() throws IOException {
+ HTable table = null;
+ HColumnDescriptor column2 =
+ new HColumnDescriptor(Bytes.toBytes("info2:"));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor testTableADesc =
+ new HTableDescriptor(tableAname);
+ testTableADesc.addFamily(column);
+ testTableADesc.addFamily(column2);
+ admin.createTable(testTableADesc);
+
+ table = new HTable(conf, tableAname);
+ BatchUpdate batchUpdate = new BatchUpdate(row);
+ BatchUpdate batchUpdate2 = new BatchUpdate(row);
+ BatchUpdate batchUpdate3 = new BatchUpdate(row);
+
+ HbaseMapWritable<byte[],byte[]> expectedValues =
+ new HbaseMapWritable<byte[],byte[]>();
+ HbaseMapWritable<byte[],byte[]> badExpectedValues =
+ new HbaseMapWritable<byte[],byte[]>();
+
+ for(int i = 0; i < 5; i++) {
+ // This batchupdate is our initial batch update,
+ // As such we also set our expected values to the same values
+ // since we will be comparing the two
+ batchUpdate.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i));
+ expectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), Bytes.toBytes(i));
+
+ badExpectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i),
+ Bytes.toBytes(500));
+
+ // This is our second batchupdate that we will use to update the initial
+ // batchupdate
+ batchUpdate2.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+1));
+
+ // This final batch update is to check that our expected values (which
+ // are now wrong)
+ batchUpdate3.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+2));
+ }
+
+ // Initialize rows
+ table.commit(batchUpdate);
+
+ // check if incorrect values are returned false
+ assertFalse(table.checkAndSave(batchUpdate2,badExpectedValues,null));
+
+ // make sure first expected values are correct
+ assertTrue(table.checkAndSave(batchUpdate2, expectedValues,null));
+
+ // make sure check and save truly saves the data after checking the expected
+ // values
+ RowResult r = table.getRow(row);
+ byte[][] columns = batchUpdate2.getColumns();
+ for(int i = 0;i < columns.length;i++) {
+ assertTrue(Bytes.equals(r.get(columns[i]).getValue(),batchUpdate2.get(columns[i])));
+ }
+
+ // make sure that the old expected values fail
+ assertFalse(table.checkAndSave(batchUpdate3, expectedValues,null));
+ }
+
/**
* the test
* @throws IOException
@@ -259,4 +321,57 @@
}
}
+ public void testGetClosestRowBefore() throws IOException {
+ HColumnDescriptor column2 =
+ new HColumnDescriptor(Bytes.toBytes("info2:"));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor testTableADesc =
+ new HTableDescriptor(tableAname);
+ testTableADesc.addFamily(column);
+ testTableADesc.addFamily(column2);
+ admin.createTable(testTableADesc);
+
+ byte[] firstRow = Bytes.toBytes("ro");
+ byte[] beforeFirstRow = Bytes.toBytes("rn");
+ byte[] beforeSecondRow = Bytes.toBytes("rov");
+
+ HTable table = new HTable(conf, tableAname);
+ BatchUpdate batchUpdate = new BatchUpdate(firstRow);
+ BatchUpdate batchUpdate2 = new BatchUpdate(row);
+ byte[] zero = new byte[]{0};
+ byte[] one = new byte[]{1};
+ byte[] columnFamilyBytes = Bytes.toBytes(COLUMN_FAMILY_STR);
+
+ batchUpdate.put(COLUMN_FAMILY_STR,zero);
+ batchUpdate2.put(COLUMN_FAMILY_STR,one);
+
+ table.commit(batchUpdate);
+ table.commit(batchUpdate2);
+
+ RowResult result = null;
+
+ // Test before first that null is returned
+ result = table.getClosestRowBefore(beforeFirstRow, columnFamilyBytes);
+ assertTrue(result == null);
+
+ // Test at first that first is returned
+ result = table.getClosestRowBefore(firstRow, columnFamilyBytes);
+ assertTrue(result.containsKey(COLUMN_FAMILY_STR));
+ assertTrue(Bytes.equals(result.get(COLUMN_FAMILY_STR).getValue(), zero));
+
+ // Test inbetween first and second that first is returned
+ result = table.getClosestRowBefore(beforeSecondRow, columnFamilyBytes);
+ assertTrue(result.containsKey(COLUMN_FAMILY_STR));
+ assertTrue(Bytes.equals(result.get(COLUMN_FAMILY_STR).getValue(), zero));
+
+ // Test at second make sure second is returned
+ result = table.getClosestRowBefore(row, columnFamilyBytes);
+ assertTrue(result.containsKey(COLUMN_FAMILY_STR));
+ assertTrue(Bytes.equals(result.get(COLUMN_FAMILY_STR).getValue(), one));
+
+ // Test after second, make sure second is returned
+ result = table.getClosestRowBefore(Bytes.add(row,one), columnFamilyBytes);
+ assertTrue(result.containsKey(COLUMN_FAMILY_STR));
+ assertTrue(Bytes.equals(result.get(COLUMN_FAMILY_STR).getValue(), one));
+ }
}