You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC
svn commit: r764289 [3/8] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/
src/java/org/apache/hadoop/hbase/io/hfile/ s...
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Apr 12 10:39:55 2009
@@ -22,15 +22,16 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
-import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,17 +43,17 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ColumnNameParseException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionHistorian;
+import org.apache.hadoop.hbase.ValueOverMaxLengthException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
@@ -122,10 +123,8 @@
private final Map<Integer, byte []> locksToRows =
new ConcurrentHashMap<Integer, byte []>();
- private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
- new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
- protected final Map<Integer, Store> stores =
- new ConcurrentHashMap<Integer, Store>();
+ protected final Map<byte [], Store> stores =
+ new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
final AtomicLong memcacheSize = new AtomicLong(0);
// This is the table subdirectory.
@@ -136,6 +135,8 @@
final HRegionInfo regionInfo;
final Path regiondir;
private final Path regionCompactionDir;
+ KeyValue.KVComparator comparator;
+ private KeyValue.KVComparator comparatorIgnoreTimestamp;
/*
* Set this when scheduling compaction if want the next compaction to be a
@@ -196,10 +197,6 @@
private long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
- //////////////////////////////////////////////////////////////////////////////
- // Constructor
- //////////////////////////////////////////////////////////////////////////////
-
/**
* HRegion constructor.
*
@@ -219,10 +216,14 @@
* @param flushListener an object that implements CacheFlushListener or null
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
+ * @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
this.basedir = basedir;
+ this.comparator = regionInfo.getComparator();
+ this.comparatorIgnoreTimestamp =
+ this.comparator.getComparatorIgnoringTimestamps();
this.log = log;
this.fs = fs;
this.conf = conf;
@@ -272,7 +273,7 @@
long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
Store store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
- this.stores.put(Bytes.mapKey(c.getName()), store);
+ this.stores.put(c.getName(), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
@@ -524,7 +525,7 @@
* @return two brand-new (and open) HRegions or null if a split is not needed
* @throws IOException
*/
- HRegion[] splitRegion(final byte [] splitRow) throws IOException {
+ HRegion [] splitRegion(final byte [] splitRow) throws IOException {
synchronized (splitLock) {
if (closed.get()) {
return null;
@@ -532,11 +533,13 @@
// Add start/end key checking: hbase-428.
byte [] startKey = this.regionInfo.getStartKey();
byte [] endKey = this.regionInfo.getEndKey();
- if (HStoreKey.equalsTwoRowKeys(startKey, splitRow)) {
+ if (this.comparator.matchingRows(startKey, 0, startKey.length,
+ splitRow, 0, splitRow.length)) {
LOG.debug("Startkey and midkey are same, not splitting");
return null;
}
- if (HStoreKey.equalsTwoRowKeys(splitRow, endKey)) {
+ if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
+ endKey, 0, endKey.length)) {
LOG.debug("Endkey and midkey are same, not splitting");
return null;
}
@@ -704,10 +707,10 @@
doRegionCompactionPrep();
long maxSize = -1;
for (Store store: stores.values()) {
- final Store.StoreSize size = store.compact(majorCompaction);
- if (size != null && size.getSize() > maxSize) {
- maxSize = size.getSize();
- splitRow = size.getSplitRow();
+ final Store.StoreSize ss = store.compact(majorCompaction);
+ if (ss != null && ss.getSize() > maxSize) {
+ maxSize = ss.getSize();
+ splitRow = ss.getSplitRow();
}
}
doRegionCompactionCleanup();
@@ -934,16 +937,14 @@
* @param column
* @param ts
* @param nv
- * @return array of values one element per version that matches the timestamp,
- * or null if there are no matches.
+ * @return Results or null if none.
* @throws IOException
*/
- public Cell[] get(final byte[] row, final byte[] column, final long ts,
+ public List<KeyValue> get(final byte[] row, final byte[] column, final long ts,
final int nv)
throws IOException {
- long timestamp = ts == -1 ? HConstants.LATEST_TIMESTAMP : ts;
- int numVersions = nv == -1 ? 1 : nv;
-
+ long timestamp = ts == -1? HConstants.LATEST_TIMESTAMP : ts;
+ int numVersions = nv == -1? 1 : nv;
splitsAndClosesLock.readLock().lock();
try {
if (this.closed.get()) {
@@ -953,17 +954,98 @@
checkRow(row);
checkColumn(column);
// Don't need a row lock for a simple get
- HStoreKey key = new HStoreKey(row, column, timestamp);
- Cell[] result = getStore(column).get(key, numVersions);
+ List<KeyValue> result = getStore(column).
+ get(KeyValue.createFirstOnRow(row, column, timestamp), numVersions);
// Guarantee that we return null instead of a zero-length array,
// if there are no results to return.
- return (result == null || result.length == 0)? null : result;
+ return (result == null || result.isEmpty())? null : result;
} finally {
splitsAndClosesLock.readLock().unlock();
}
}
/**
+ * Data structure with a counter that is accessible rather than create a
+ * new Integer every time we want to up the counter. Initializes at count 1.
+ */
+ static class Counter {
+ int counter = 1;
+ }
+
+ /*
+ * Check to see if we've not gone over threshold for this particular
+ * column.
+ * @param kv
+ * @param versions
+ * @param versionsCount
+ * @return True if its ok to add current value.
+ */
+ static boolean okToAddResult(final KeyValue kv, final int versions,
+ final Map<KeyValue, HRegion.Counter> versionsCount) {
+ if (versionsCount == null) {
+ return true;
+ }
+ if (versionsCount.containsKey(kv)) {
+ if (versionsCount.get(kv).counter < versions) {
+ return true;
+ }
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ /*
+ * Used adding item found to list of results getting.
+ * @param kv
+ * @param versionsCount
+ * @param results
+ */
+ static void addResult(final KeyValue kv,
+ final Map<KeyValue, HRegion.Counter> versionsCount,
+ final List<KeyValue> results) {
+ // Don't add if already present; i.e. ignore second entry.
+ if (results.contains(kv)) return;
+ results.add(kv);
+ if (versionsCount == null) {
+ return;
+ }
+ if (!versionsCount.containsKey(kv)) {
+ versionsCount.put(kv, new HRegion.Counter());
+ } else {
+ versionsCount.get(kv).counter++;
+ }
+ }
+
+ /*
+ * @param versions Number of versions to get.
+ * @param versionsCount May be null.
+ * @param columns Columns we want to fetch.
+ * @return True if has enough versions.
+ */
+ static boolean hasEnoughVersions(final int versions,
+ final Map<KeyValue, HRegion.Counter> versionsCount,
+ final Set<byte []> columns) {
+ if (columns == null || versionsCount == null) {
+ // Wants all columns so just keep going
+ return false;
+ }
+ if (columns.size() > versionsCount.size()) {
+ return false;
+ }
+ if (versions == 1) {
+ return true;
+ }
+ // Need to look at each to make sure at least versions.
+ for (Map.Entry<KeyValue, HRegion.Counter> e: versionsCount.entrySet()) {
+ if (e.getValue().counter < versions) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Fetch all the columns for the indicated row at a specified timestamp.
* Returns a HbaseMapWritable that maps column names to values.
*
@@ -982,7 +1064,7 @@
* @throws IOException
*/
public HbaseMapWritable<byte [], Cell> getFull(final byte [] row,
- final Set<byte []> columns, final long ts,
+ final NavigableSet<byte []> columns, final long ts,
final int numVersions, final Integer lockid)
throws IOException {
// Check columns passed
@@ -991,16 +1073,16 @@
checkColumn(column);
}
}
- HStoreKey key = new HStoreKey(row, ts);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+ Map<KeyValue, Counter> versionCounter =
+ new TreeMap<KeyValue, Counter>(this.comparatorIgnoreTimestamp);
Integer lid = getLock(lockid,row);
HashSet<Store> storeSet = new HashSet<Store>();
try {
- HbaseMapWritable<byte [], Cell> result =
- new HbaseMapWritable<byte [], Cell>();
// Get the concerned columns or all of them
if (columns != null) {
for (byte[] bs : columns) {
- Store store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs)));
+ Store store = stores.get(bs);
if (store != null) {
storeSet.add(store);
}
@@ -1008,25 +1090,31 @@
} else {
storeSet.addAll(stores.values());
}
+ long timestamp =
+ (ts == HConstants.LATEST_TIMESTAMP)? System.currentTimeMillis(): ts;
+ KeyValue key = KeyValue.createFirstOnRow(row, timestamp);
// For each column name that is just a column family, open the store
// related to it and fetch everything for that row. HBASE-631
// Also remove each store from storeSet so that these stores
// won't be opened for no reason. HBASE-783
if (columns != null) {
- for (byte[] bs : columns) {
- if (HStoreKey.getFamilyDelimiterIndex(bs) == (bs.length - 1)) {
- Store store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs)));
- store.getFull(key, null, numVersions, result);
+ for (byte [] bs : columns) {
+ // TODO: Fix so we use comparator in KeyValue that looks at
+ // column family portion only.
+ if (KeyValue.getFamilyDelimiterIndex(bs, 0, bs.length) == (bs.length - 1)) {
+ Store store = stores.get(bs);
+ store.getFull(key, null, null, numVersions, versionCounter,
+ keyvalues, timestamp);
storeSet.remove(store);
}
}
}
-
for (Store targetStore: storeSet) {
- targetStore.getFull(key, columns, numVersions, result);
+ targetStore.getFull(key, columns, null, numVersions, versionCounter,
+ keyvalues, timestamp);
}
- return result;
+ return Cell.createCells(keyvalues);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1057,77 +1145,35 @@
* @throws IOException
*/
public RowResult getClosestRowBefore(final byte [] row,
- final byte [] columnFamily)
+ final byte [] columnFamily)
throws IOException{
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
- HStoreKey key = null;
+ KeyValue key = null;
checkRow(row);
splitsAndClosesLock.readLock().lock();
try {
Store store = getStore(columnFamily);
+ KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
- byte [] closestKey = store.getRowKeyAtOrBefore(row);
- // If it happens to be an exact match, we can stop.
- // Otherwise, we need to check if it's the max and move to the next
- if (closestKey != null) {
- if (HStoreKey.equalsTwoRowKeys(row, closestKey)) {
- key = new HStoreKey(closestKey);
- }
- if (key == null) {
- key = new HStoreKey(closestKey);
- }
- }
+ key = store.getRowKeyAtOrBefore(kv);
if (key == null) {
return null;
}
-
- // Now that we've found our key, get the values
- HbaseMapWritable<byte [], Cell> cells =
- new HbaseMapWritable<byte [], Cell>();
- // This will get all results for this store.
- store.getFull(key, null, 1, cells);
- return new RowResult(key.getRow(), cells);
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ // This will get all results for this store. TODO: Do I have to make a
+ // new key?
+ if (!this.comparator.matchingRows(kv, key)) {
+ kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
+ }
+ store.getFull(kv, null, null, 1, null, results, System.currentTimeMillis());
+ // Convert to RowResult. TODO: Remove need to do this.
+ return RowResult.createRowResult(results);
} finally {
splitsAndClosesLock.readLock().unlock();
}
}
- /*
- * Get <code>versions</code> keys matching the origin key's
- * row/column/timestamp and those of an older vintage.
- * Public so available when debugging.
- * @param origin Where to start searching.
- * @param versions How many versions to return. Pass HConstants.ALL_VERSIONS
- * to retrieve all.
- * @return Ordered list of <code>versions</code> keys going from newest back.
- * @throws IOException
- */
- private Set<HStoreKey> getKeys(final HStoreKey origin, final int versions)
- throws IOException {
- Set<HStoreKey> keys = new TreeSet<HStoreKey>();
- Collection<Store> storesToCheck = null;
- if (origin.getColumn() == null || origin.getColumn().length == 0) {
- // All families
- storesToCheck = this.stores.values();
- } else {
- storesToCheck = new ArrayList<Store>(1);
- storesToCheck.add(getStore(origin.getColumn()));
- }
- long now = System.currentTimeMillis();
- for (Store targetStore: storesToCheck) {
- if (targetStore != null) {
- // Pass versions without modification since in the store getKeys, it
- // includes the size of the passed <code>keys</code> array when counting.
- List<HStoreKey> r = targetStore.getKeys(origin, versions, now, null);
- if (r != null) {
- keys.addAll(r);
- }
- }
- }
- return keys;
- }
-
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns for only the rows that match the data filter. This Iterator must
@@ -1153,13 +1199,18 @@
throw new IOException("Region " + this + " closed");
}
HashSet<Store> storeSet = new HashSet<Store>();
+ NavigableSet<byte []> columns =
+ new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ // Below we make up set of stores we want scanners on and we fill out the
+ // list of columns.
for (int i = 0; i < cols.length; i++) {
- Store s = stores.get(Bytes.mapKey(HStoreKey.getFamily(cols[i])));
+ columns.add(cols[i]);
+ Store s = stores.get(cols[i]);
if (s != null) {
storeSet.add(s);
}
}
- return new HScanner(cols, firstRow, timestamp,
+ return new HScanner(columns, firstRow, timestamp,
storeSet.toArray(new Store [storeSet.size()]), filter);
} finally {
newScannerLock.readLock().unlock();
@@ -1206,13 +1257,13 @@
public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
+ validateValuesLength(b);
// Do a rough check that we have resources to accept a write. The check is
// 'rough' in that between the resource check and the call to obtain a
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
-
splitsAndClosesLock.readLock().lock();
try {
// We obtain a per-row lock, so other clients will block while one client
@@ -1222,50 +1273,53 @@
// invokes a HRegion#abort.
byte [] row = b.getRow();
// If we did not pass an existing row lock, obtain a new one
- Integer lid = getLock(lockid,row);
- long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
- System.currentTimeMillis() : b.getTimestamp();
+ Integer lid = getLock(lockid, row);
+ long commitTime = b.getTimestamp() == LATEST_TIMESTAMP?
+ System.currentTimeMillis(): b.getTimestamp();
+ Set<byte []> latestTimestampDeletes = null;
+ List<KeyValue> edits = new ArrayList<KeyValue>();
try {
- List<byte []> deletes = null;
for (BatchOperation op: b) {
- HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
- byte[] val = null;
+ byte [] column = op.getColumn();
+ checkColumn(column);
+ KeyValue kv = null;
if (op.isPut()) {
- val = op.getValue();
+ byte [] val = op.getValue();
if (HLogEdit.isDeleted(val)) {
- throw new IOException("Cannot insert value: " + val);
+ throw new IOException("Cannot insert value: " +
+ Bytes.toString(val));
}
+ kv = new KeyValue(row, column, commitTime, val);
} else {
+ // Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {
- // Save off these deletes
- if (deletes == null) {
- deletes = new ArrayList<byte []>();
+ // Save off these deletes of the most recent thing added on the
+ // family.
+ if (latestTimestampDeletes == null) {
+ latestTimestampDeletes =
+ new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
}
- deletes.add(op.getColumn());
- } else {
- val = HLogEdit.DELETED_BYTES;
+ latestTimestampDeletes.add(op.getColumn());
+ continue;
}
+ // Its an explicit timestamp delete
+ kv = new KeyValue(row, column, commitTime, KeyValue.Type.Delete,
+ HConstants.EMPTY_BYTE_ARRAY);
}
- if (val != null) {
- localput(lid, key, val);
- }
+ edits.add(kv);
}
- TreeMap<HStoreKey, byte[]> edits =
- this.targetColumns.remove(lid);
-
- if (edits != null && edits.size() > 0) {
+ if (!edits.isEmpty()) {
update(edits, writeToWAL);
}
-
- if (deletes != null && deletes.size() > 0) {
- // We have some LATEST_TIMESTAMP deletes to run.
- for (byte [] column: deletes) {
+ if (latestTimestampDeletes != null &&
+ !latestTimestampDeletes.isEmpty()) {
+ // We have some LATEST_TIMESTAMP deletes to run. Can't do them inline
+ // as edits. Need to do individually after figuring which is latest
+ // timestamp to delete.
+ for (byte [] column: latestTimestampDeletes) {
deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
}
}
- } catch (IOException e) {
- this.targetColumns.remove(Long.valueOf(lid));
- throw e;
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1274,7 +1328,6 @@
}
}
-
/**
* Performs an atomic check and save operation. Checks if
* the specified expected values have changed, and if not
@@ -1297,66 +1350,72 @@
// should read the comments from the batchUpdate method
boolean success = true;
checkReadOnly();
+ validateValuesLength(b);
checkResources();
splitsAndClosesLock.readLock().lock();
try {
byte[] row = b.getRow();
Integer lid = getLock(lockid,row);
try {
- Set<byte[]> keySet = expectedValues.keySet();
- Map<byte[],Cell> actualValues = this.getFull(row,keySet,
- HConstants.LATEST_TIMESTAMP, 1,lid);
+ NavigableSet<byte []> keySet =
+ new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ keySet.addAll(expectedValues.keySet());
+ Map<byte[],Cell> actualValues = getFull(row, keySet,
+ HConstants.LATEST_TIMESTAMP, 1,lid);
for (byte[] key : keySet) {
// If test fails exit
if(!Bytes.equals(actualValues.get(key).getValue(),
- expectedValues.get(key))) {
+ expectedValues.get(key))) {
success = false;
break;
}
}
-
if (success) {
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
System.currentTimeMillis(): b.getTimestamp();
- List<byte []> deletes = null;
+ Set<byte []> latestTimestampDeletes = null;
+ List<KeyValue> edits = new ArrayList<KeyValue>();
for (BatchOperation op: b) {
- HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
- byte[] val = null;
+ byte [] column = op.getColumn();
+ KeyValue kv = null;
if (op.isPut()) {
- val = op.getValue();
+ byte [] val = op.getValue();
if (HLogEdit.isDeleted(val)) {
- throw new IOException("Cannot insert value: " + val);
+ throw new IOException("Cannot insert value: " +
+ Bytes.toString(val));
}
+ kv = new KeyValue(row, column, commitTime, val);
} else {
+ // Its a delete.
if (b.getTimestamp() == LATEST_TIMESTAMP) {
- // Save off these deletes
- if (deletes == null) {
- deletes = new ArrayList<byte []>();
+ // Save off these deletes of the most recent thing added on
+ // the family.
+ if (latestTimestampDeletes == null) {
+ latestTimestampDeletes =
+ new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
}
- deletes.add(op.getColumn());
+ latestTimestampDeletes.add(op.getColumn());
} else {
- val = HLogEdit.DELETED_BYTES;
+ // Its an explicit timestamp delete
+ kv = new KeyValue(row, column, commitTime,
+ KeyValue.Type.Delete, HConstants.EMPTY_BYTE_ARRAY);
}
}
- if (val != null) {
- localput(lid, key, val);
- }
+ edits.add(kv);
}
- TreeMap<HStoreKey, byte[]> edits =
- this.targetColumns.remove(lid);
- if (edits != null && edits.size() > 0) {
+ if (!edits.isEmpty()) {
update(edits, writeToWAL);
}
- if (deletes != null && deletes.size() > 0) {
- // We have some LATEST_TIMESTAMP deletes to run.
- for (byte [] column: deletes) {
+ if (latestTimestampDeletes != null &&
+ !latestTimestampDeletes.isEmpty()) {
+ // We have some LATEST_TIMESTAMP deletes to run. Can't do them inline
+ // as edits. Need to do individually after figuring which is latest
+ // timestamp to delete.
+ for (byte [] column: latestTimestampDeletes) {
deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
}
}
}
- } catch (IOException e) {
- this.targetColumns.remove(Long.valueOf(lid));
- throw e;
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1367,6 +1426,31 @@
}
/*
+ * Utility method to verify values length
+ * @param batchUpdate The update to verify
+ * @throws IOException Thrown if a value is too long
+ */
+ private void validateValuesLength(BatchUpdate batchUpdate)
+ throws IOException {
+ for (Iterator<BatchOperation> iter =
+ batchUpdate.iterator(); iter.hasNext();) {
+ BatchOperation operation = iter.next();
+ if (operation.getValue() != null) {
+ HColumnDescriptor fam = this.regionInfo.getTableDesc().
+ getFamily(operation.getColumn());
+ if (fam != null) {
+ int maxLength = fam.getMaxValueLength();
+ if (operation.getValue().length > maxLength) {
+ throw new ValueOverMaxLengthException("Value in column "
+ + Bytes.toString(operation.getColumn()) + " is too long. "
+ + operation.getValue().length + " instead of " + maxLength);
+ }
+ }
+ }
+ }
+ }
+
+ /*
* Check if resources to support an update.
*
* Here we synchronize on HRegion, a broad scoped lock. Its appropriate
@@ -1417,9 +1501,8 @@
checkReadOnly();
Integer lid = getLock(lockid,row);
try {
- // Delete ALL versions rather than MAX_VERSIONS. If we just did
- // MAX_VERSIONS, then if 2* MAX_VERSION cells, subsequent gets would
- // get old stuff.
+ // Delete ALL versions rather than column family VERSIONS. If we just did
+ // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
deleteMultiple(row, column, ts, ALL_VERSIONS);
} finally {
if(lockid == null) releaseRowLock(lid);
@@ -1433,25 +1516,28 @@
* @param lockid Row lock
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public void deleteAll(final byte [] row, final long ts, final Integer lockid)
throws IOException {
checkReadOnly();
Integer lid = getLock(lockid, row);
- long now = System.currentTimeMillis();
+ long time = ts;
+ if (ts == HConstants.LATEST_TIMESTAMP) {
+ time = System.currentTimeMillis();
+ }
+ KeyValue kv = KeyValue.createFirstOnRow(row, time);
try {
for (Store store : stores.values()) {
- List<HStoreKey> keys = store.getKeys(new HStoreKey(row, ts),
- ALL_VERSIONS, now, null);
- TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
- HStoreKey.getWritableComparator(store.getHRegionInfo()));
- for (HStoreKey key: keys) {
- edits.put(key, HLogEdit.DELETED_BYTES);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+ store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, time);
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ for (KeyValue key: keyvalues) {
+ // This is UGLY. COPY OF KEY PART OF KeyValue.
+ edits.add(key.cloneDelete());
}
update(edits);
}
} finally {
- if(lockid == null) releaseRowLock(lid);
+ if (lockid == null) releaseRowLock(lid);
}
}
@@ -1465,21 +1551,21 @@
* @param lockid Row lock
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public void deleteAllByRegex(final byte [] row, final String columnRegex,
final long timestamp, final Integer lockid) throws IOException {
checkReadOnly();
Pattern columnPattern = Pattern.compile(columnRegex);
Integer lid = getLock(lockid, row);
long now = System.currentTimeMillis();
+ KeyValue kv = new KeyValue(row, timestamp);
try {
for (Store store : stores.values()) {
- List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
- ALL_VERSIONS, now, columnPattern);
- TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
- HStoreKey.getWritableComparator(store.getHRegionInfo()));
- for (HStoreKey key: keys) {
- edits.put(key, HLogEdit.DELETED_BYTES);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+ store.getFull(kv, null, columnPattern, ALL_VERSIONS, null, keyvalues,
+ now);
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ for (KeyValue key: keyvalues) {
+ edits.add(key.cloneDelete());
}
update(edits);
}
@@ -1498,7 +1584,6 @@
* @param lockid Row lock
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public void deleteFamily(byte [] row, byte [] family, long timestamp,
final Integer lockid)
throws IOException{
@@ -1509,20 +1594,20 @@
// find the HStore for the column family
Store store = getStore(family);
// find all the keys that match our criteria
- List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
- ALL_VERSIONS, now, null);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+ store.getFull(new KeyValue(row, timestamp), null, null, ALL_VERSIONS,
+ null, keyvalues, now);
// delete all the cells
- TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
- HStoreKey.getWritableComparator(store.getHRegionInfo()));
- for (HStoreKey key: keys) {
- edits.put(key, HLogEdit.DELETED_BYTES);
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ for (KeyValue kv: keyvalues) {
+ edits.add(kv.cloneDelete());
}
update(edits);
} finally {
if(lockid == null) releaseRowLock(lid);
}
}
-
+
/**
* Delete all cells for a row with all the matching column families by
* familyRegex with timestamps less than or equal to <i>timestamp</i>.
@@ -1534,14 +1619,15 @@
* @param lockid Row lock
* @throws IOException
*/
- @SuppressWarnings("unchecked")
- public void deleteFamilyByRegex(byte [] row, String familyRegex, long timestamp,
- final Integer lockid) throws IOException {
+ public void deleteFamilyByRegex(byte [] row, String familyRegex,
+ final long timestamp, final Integer lockid)
+ throws IOException {
checkReadOnly();
// construct the family regex pattern
Pattern familyPattern = Pattern.compile(familyRegex);
Integer lid = getLock(lockid, row);
long now = System.currentTimeMillis();
+ KeyValue kv = new KeyValue(row, timestamp);
try {
for(Store store: stores.values()) {
String familyName = Bytes.toString(store.getFamily().getName());
@@ -1549,12 +1635,11 @@
if(!(familyPattern.matcher(familyName).matches()))
continue;
- List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
- ALL_VERSIONS, now, null);
- TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
- HStoreKey.getWritableComparator(store.getHRegionInfo()));
- for (HStoreKey key: keys) {
- edits.put(key, HLogEdit.DELETED_BYTES);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+ store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, now);
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ for (KeyValue k: keyvalues) {
+ edits.add(k.cloneDelete());
}
update(edits);
}
@@ -1574,21 +1659,23 @@
* {@link HConstants#ALL_VERSIONS} to delete all.
* @throws IOException
*/
- @SuppressWarnings("unchecked")
private void deleteMultiple(final byte [] row, final byte [] column,
final long ts, final int versions)
throws IOException {
checkReadOnly();
- HStoreKey origin = new HStoreKey(row, column, ts);
- Set<HStoreKey> keys = getKeys(origin, versions);
- if (keys.size() > 0) {
- // I think the below map doesn't have to be exactly storetd. Its deletes
- // they don't have to go in in exact sorted order (we don't have to worry
+ // We used to have a getKeys method that purportedly only got the keys and
+ // not the keys and values. We now just do getFull. For memcache values,
+ // shouldn't matter if we get key and value since it'll be the entry that
+ // is in memcache. For the keyvalues from storefile, could be saving if
+ // we only returned key component. TODO.
+ List<KeyValue> keys = get(row, column, ts, versions);
+ if (keys != null && keys.size() > 0) {
+ // I think the below edits don't have to be storted. Its deletes.
+ // hey don't have to go in in exact sorted order (we don't have to worry
// about the meta or root sort comparator here).
- TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
- new HStoreKey.HStoreKeyComparator());
- for (HStoreKey key: keys) {
- edits.put(key, HLogEdit.DELETED_BYTES);
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ for (KeyValue key: keys) {
+ edits.add(key.cloneDelete());
}
update(edits);
}
@@ -1610,16 +1697,14 @@
checkRow(row);
Integer lid = getLock(lockid, row);
try {
- HStoreKey origin;
+ NavigableSet<byte []> columns = null;
if (column != null) {
- origin = new HStoreKey(row, column, timestamp);
- } else {
- origin = new HStoreKey(row, timestamp);
+ columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ columns.add(column);
}
- return !getKeys(origin, 1).isEmpty();
+ return !getFull(row, columns, timestamp, 1, lid).isEmpty();
} finally {
- if (lockid == null)
- releaseRowLock(lid);
+ if (lockid == null) releaseRowLock(lid);
}
}
@@ -1631,44 +1716,15 @@
throw new IOException("region is read only");
}
}
-
- /**
- * Private implementation.
- *
- * localput() is used for both puts and deletes. We just place the values
- * into a per-row pending area, until a commit() or abort() call is received.
- * (Or until the user's write-lock expires.)
- *
- * @param lockid
- * @param key
- * @param val Value to enter into cell
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- private void localput(final Integer lockid, final HStoreKey key,
- final byte [] val)
- throws IOException {
- checkColumn(key.getColumn());
- checkReadOnly();
- TreeMap<HStoreKey, byte []> targets = this.targetColumns.get(lockid);
- if (targets == null) {
- // I think the below map doesn't have to be exactly storetd. Its deletes
- // they don't have to go in in exact sorted order (we don't have to worry
- // about the meta or root sort comparator here).
- targets = new TreeMap<HStoreKey, byte []>(new HStoreKey.HStoreKeyComparator());
- this.targetColumns.put(lockid, targets);
- }
- targets.put(key, val);
- }
-
+
/**
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
- * @param updatesByColumn Cell updates by column
+ * @param edits Cell updates by column
* @throws IOException
*/
- private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException {
- this.update(updatesByColumn, true);
+ private void update(final List<KeyValue> edits) throws IOException {
+ this.update(edits, true);
}
/**
@@ -1678,26 +1734,23 @@
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
- private void update(final TreeMap<HStoreKey, byte []> updatesByColumn,
- boolean writeToWAL)
+ private void update(final List<KeyValue> edits, boolean writeToWAL)
throws IOException {
- if (updatesByColumn == null || updatesByColumn.size() <= 0) {
+ if (edits == null || edits.isEmpty()) {
return;
}
- checkReadOnly();
boolean flush = false;
this.updatesLock.readLock().lock();
try {
if (writeToWAL) {
this.log.append(regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), updatesByColumn,
+ regionInfo.getTableDesc().getName(), edits,
(regionInfo.isMetaRegion() || regionInfo.isRootRegion()));
}
long size = 0;
- for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
- HStoreKey key = e.getKey();
- size = this.memcacheSize.addAndGet(
- getStore(key.getColumn()).add(key, e.getValue()));
+ for (KeyValue kv: edits) {
+ // TODO: Fix -- do I have to do a getColumn here?
+ size = this.memcacheSize.addAndGet(getStore(kv.getColumn()).add(kv));
}
flush = isFlushSize(size);
} finally {
@@ -1757,9 +1810,9 @@
* TODO: Make this lookup faster.
*/
public Store getStore(final byte [] column) {
- return this.stores.get(HStoreKey.getFamilyMapKey(column));
+ return this.stores.get(column);
}
-
+
//////////////////////////////////////////////////////////////////////////////
// Support code
//////////////////////////////////////////////////////////////////////////////
@@ -1780,21 +1833,14 @@
* @param columnName
* @throws NoSuchColumnFamilyException
*/
- private void checkColumn(final byte [] columnName)
- throws NoSuchColumnFamilyException, ColumnNameParseException {
- if (columnName == null) {
+ private void checkColumn(final byte [] column)
+ throws NoSuchColumnFamilyException {
+ if (column == null) {
return;
}
-
- int index = HStoreKey.getFamilyDelimiterIndex(columnName);
- if (index <= 0) {
- throw new ColumnNameParseException(Bytes.toString(columnName) +
- " is missing column family delimiter '" +
- HStoreKey.COLUMN_FAMILY_DELIMITER + "'");
- }
- if (!regionInfo.getTableDesc().hasFamily(columnName, index)) {
+ if (!regionInfo.getTableDesc().hasFamily(column)) {
throw new NoSuchColumnFamilyException("Column family on " +
- Bytes.toString(columnName) + " does not exist in region " + this
+ Bytes.toString(column) + " does not exist in region " + this
+ " in table " + regionInfo.getTableDesc());
}
}
@@ -1893,7 +1939,7 @@
if (lockid == null) {
lid = obtainRowLock(row);
} else {
- if(!isRowLocked(lockid)) {
+ if (!isRowLocked(lockid)) {
throw new IOException("Invalid row lock");
}
lid = lockid;
@@ -1939,31 +1985,26 @@
*/
private class HScanner implements InternalScanner {
private InternalScanner[] scanners;
- private TreeMap<byte [], Cell>[] resultSets;
- private HStoreKey[] keys;
+ private List<KeyValue> [] resultSets;
private RowFilterInterface filter;
- private HStoreKey.StoreKeyComparator comparator;
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
- HScanner(byte [][] cols, byte [] firstRow, long timestamp, Store [] stores,
- RowFilterInterface filter)
+ HScanner(final NavigableSet<byte []> columns, byte [] firstRow,
+ long timestamp, final Store [] stores, final RowFilterInterface filter)
throws IOException {
this.filter = filter;
this.scanners = new InternalScanner[stores.length];
try {
- this.comparator = HStoreKey.getRawComparator(regionInfo);
-
for (int i = 0; i < stores.length; i++) {
// Only pass relevant columns to each store
- List<byte[]> columns = new ArrayList<byte[]>();
- for (int j = 0; j < cols.length; j++) {
- if (Bytes.equals(HStoreKey.getFamily(cols[j]),
- stores[i].getFamily().getName())) {
- columns.add(cols[j]);
+ NavigableSet<byte[]> columnSubset =
+ new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (byte [] c: columns) {
+ if (KeyValue.FAMILY_COMPARATOR.compare(stores[i].storeName, c) == 0) {
+ columnSubset.add(c);
}
}
-
RowFilterInterface f = filter;
if (f != null) {
// Need to replicate filters.
@@ -1971,12 +2012,11 @@
// one shared across many rows. See HADOOP-2467.
f = WritableUtils.clone(filter, conf);
}
- scanners[i] = stores[i].getScanner(timestamp,
- columns.toArray(new byte[columns.size()][]), firstRow, f);
+ scanners[i] = stores[i].getScanner(timestamp, columnSubset, firstRow, f);
}
} catch (IOException e) {
for (int i = 0; i < this.scanners.length; i++) {
- if(scanners[i] != null) {
+ if (scanners[i] != null) {
closeScanner(i);
}
}
@@ -1985,12 +2025,10 @@
// Advance to the first key in each store.
// All results will match the required column-set and scanTime.
- this.resultSets = new TreeMap[scanners.length];
- this.keys = new HStoreKey[scanners.length];
+ this.resultSets = new List[scanners.length];
for (int i = 0; i < scanners.length; i++) {
- keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY);
- resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
- if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+ resultSets[i] = new ArrayList<KeyValue>();
+ if(scanners[i] != null && !scanners[i].next(resultSets[i])) {
closeScanner(i);
}
}
@@ -2000,81 +2038,61 @@
activeScannerCount.incrementAndGet();
}
- public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ @SuppressWarnings("null")
+ public boolean next(List<KeyValue> results)
throws IOException {
boolean moreToFollow = false;
boolean filtered = false;
-
do {
- // Find the lowest-possible key.
- byte [] chosenRow = null;
+ // Find the lowest key across all stores.
+ KeyValue chosen = null;
long chosenTimestamp = -1;
- for (int i = 0; i < this.keys.length; i++) {
- if (scanners[i] != null &&
- (chosenRow == null ||
- (this.comparator.compareRows(this.keys[i].getRow(), chosenRow) < 0) ||
- ((this.comparator.compareRows(this.keys[i].getRow(), chosenRow) == 0) &&
- (keys[i].getTimestamp() > chosenTimestamp)))) {
- chosenRow = keys[i].getRow();
- chosenTimestamp = keys[i].getTimestamp();
+ for (int i = 0; i < this.scanners.length; i++) {
+ if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+ continue;
+ }
+ KeyValue kv = this.resultSets[i].get(0);
+ if (chosen == null ||
+ (comparator.compareRows(kv, chosen) < 0) ||
+ ((comparator.compareRows(kv, chosen) == 0) &&
+ (kv.getTimestamp() > chosenTimestamp))) {
+ chosen = kv;
+ chosenTimestamp = chosen.getTimestamp();
}
}
- // Store the key and results for each sub-scanner. Merge them as
- // appropriate.
+ // Store results from each sub-scanner.
if (chosenTimestamp >= 0) {
- // Here we are setting the passed in key with current row+timestamp
- key.setRow(chosenRow);
- key.setVersion(chosenTimestamp);
- key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
-
for (int i = 0; i < scanners.length; i++) {
- if (scanners[i] != null &&
- this.comparator.compareRows(this.keys[i].getRow(), chosenRow) == 0) {
- // NOTE: We used to do results.putAll(resultSets[i]);
- // but this had the effect of overwriting newer
- // values with older ones. So now we only insert
- // a result if the map does not contain the key.
- for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
- if (!results.containsKey(e.getKey())) {
- results.put(e.getKey(), e.getValue());
- }
- }
+ if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+ continue;
+ }
+ KeyValue kv = this.resultSets[i].get(0);
+ if (comparator.compareRows(kv, chosen) == 0) {
+ results.addAll(this.resultSets[i]);
resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
+ if (!scanners[i].next(resultSets[i])) {
closeScanner(i);
}
}
}
}
- for (int i = 0; i < scanners.length; i++) {
- // If the current scanner is non-null AND has a lower-or-equal
- // row label, then its timestamp is bad. We need to advance it.
- while ((scanners[i] != null) &&
- (this.comparator.compareRows(this.keys[i].getRow(), chosenRow) <= 0)) {
- resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
- closeScanner(i);
- }
- }
- }
-
moreToFollow = chosenTimestamp >= 0;
if (results == null || results.size() <= 0) {
// If we got no results, then there is no more to follow.
moreToFollow = false;
}
-
+
filtered = filter == null ? false : filter.filterRow(results);
-
if (filter != null && filter.filterAllRemaining()) {
moreToFollow = false;
}
if (moreToFollow) {
if (filter != null) {
- filter.rowProcessed(filtered, key.getRow());
+ filter.rowProcessed(filtered, chosen.getBuffer(), chosen.getRowOffset(),
+ chosen.getRowLength());
}
if (filtered) {
results.clear();
@@ -2108,9 +2126,6 @@
if (resultSets != null) {
resultSets[i] = null;
}
- if (keys != null) {
- keys[i] = null;
- }
}
}
@@ -2222,7 +2237,6 @@
*
* @throws IOException
*/
- @SuppressWarnings("unchecked")
public static void addRegionToMETA(HRegion meta, HRegion r)
throws IOException {
meta.checkResources();
@@ -2230,11 +2244,9 @@
byte [] row = r.getRegionName();
Integer lid = meta.obtainRowLock(row);
try {
- HStoreKey key = new HStoreKey(row, COL_REGIONINFO,
- System.currentTimeMillis());
- TreeMap<HStoreKey, byte[]> edits = new TreeMap<HStoreKey, byte[]>(
- HStoreKey.getWritableComparator(r.getRegionInfo()));
- edits.put(key, Writables.getBytes(r.getRegionInfo()));
+ List<KeyValue> edits = new ArrayList<KeyValue>();
+ edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
+ Writables.getBytes(r.getRegionInfo())));
meta.update(edits);
} finally {
meta.releaseRowLock(lid);
@@ -2389,7 +2401,6 @@
*/
public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
throws IOException {
-
HRegion a = srcA;
HRegion b = srcB;
@@ -2406,7 +2417,7 @@
b = srcA;
}
- if (!HStoreKey.equalsTwoRowKeys(a.getEndKey(), b.getStartKey())) {
+ if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
throw new IOException("Cannot merge non-adjacent regions");
}
return merge(a, b);
@@ -2450,17 +2461,25 @@
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path basedir = a.getBaseDir();
- final byte [] startKey = HStoreKey.equalsTwoRowKeys(a.getStartKey(),
- EMPTY_BYTE_ARRAY) ||
- HStoreKey.equalsTwoRowKeys(b.getStartKey(), EMPTY_BYTE_ARRAY)?
- EMPTY_BYTE_ARRAY: Bytes.compareTo(a.getStartKey(),
- b.getStartKey()) <= 0?
+ // Presume both are of same region type -- i.e. both user or catalog
+ // table regions. This way can use comparator.
+ final byte [] startKey = a.comparator.matchingRows(a.getStartKey(), 0,
+ a.getStartKey().length,
+ EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
+ b.comparator.matchingRows(b.getStartKey(), 0, b.getStartKey().length,
+ EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
+ EMPTY_BYTE_ARRAY:
+ a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
+ b.getStartKey(), 0, b.getStartKey().length) <= 0?
a.getStartKey(): b.getStartKey();
- final byte [] endKey = HStoreKey.equalsTwoRowKeys(a.getEndKey(),
- EMPTY_BYTE_ARRAY) ||
- HStoreKey.equalsTwoRowKeys(b.getEndKey(), EMPTY_BYTE_ARRAY)?
+ final byte [] endKey = a.comparator.matchingRows(a.getEndKey(), 0,
+ a.getEndKey().length, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length) ||
+ a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
+ EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length)?
EMPTY_BYTE_ARRAY:
- Bytes.compareTo(a.getEndKey(), b.getEndKey()) <= 0? b.getEndKey(): a.getEndKey();
+ a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
+ b.getEndKey(), 0, b.getEndKey().length) <= 0?
+ b.getEndKey(): a.getEndKey();
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
LOG.info("Creating new region " + newRegionInfo.toString());
@@ -2591,48 +2610,54 @@
}
}
-
- public long incrementColumnValue(byte[] row, byte[] column, long amount) throws IOException {
+ public long incrementColumnValue(byte[] row, byte[] column, long amount)
+ throws IOException {
checkRow(row);
checkColumn(column);
Integer lid = obtainRowLock(row);
splitsAndClosesLock.readLock().lock();
try {
- HStoreKey hsk = new HStoreKey(row, column);
+ KeyValue kv = new KeyValue(row, column);
long ts = System.currentTimeMillis();
byte [] value = null;
Store store = getStore(column);
- List<Cell> c;
+ List<KeyValue> c;
// Try the memcache first.
store.lock.readLock().lock();
try {
- c = store.memcache.get(hsk, 1);
+ c = store.memcache.get(kv, 1);
} finally {
store.lock.readLock().unlock();
}
// Pick the latest value out of List<Cell> c:
if (c.size() >= 1) {
// Use the memcache timestamp value.
- LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
+ LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) +
+ "/" + Bytes.toString(column));
ts = c.get(0).getTimestamp();
value = c.get(0).getValue();
}
if (value == null) {
// Check the store (including disk) for the previous value.
- Cell[] cell = store.get(hsk, 1);
- if (cell != null && cell.length >= 1) {
- LOG.debug("Using HFile previous value for " + Bytes.toString(row) + "/" + Bytes.toString(column));
- value = cell[0].getValue();
+ c = store.get(kv, 1);
+ if (c != null && c.size() == 1) {
+ LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
+ "/" + Bytes.toString(column));
+ value = c.get(0).getValue();
+ } else if (c != null && c.size() > 1) {
+ throw new DoNotRetryIOException("more than 1 value returned in " +
+ "incrementColumnValue from Store");
}
}
if (value == null) {
// Doesn't exist
- LOG.debug("Creating new counter value for " + Bytes.toString(row) + "/"+ Bytes.toString(column));
+ LOG.debug("Creating new counter value for " + Bytes.toString(row) +
+ "/"+ Bytes.toString(column));
value = Bytes.toBytes(amount);
} else {
value = incrementBytes(value, amount);
@@ -2652,10 +2677,12 @@
// Hopefully this doesn't happen too often.
if (value.length < Bytes.SIZEOF_LONG) {
byte [] newvalue = new byte[Bytes.SIZEOF_LONG];
- System.arraycopy(value, 0, newvalue, newvalue.length - value.length, value.length);
+ System.arraycopy(value, 0, newvalue, newvalue.length - value.length,
+ value.length);
value = newvalue;
} else if (value.length > Bytes.SIZEOF_LONG) {
- throw new DoNotRetryIOException("Increment Bytes - value too big: " + value.length);
+ throw new DoNotRetryIOException("Increment Bytes - value too big: " +
+ value.length);
}
return binaryIncrement(value, amount);
}
@@ -2675,4 +2702,4 @@
}
return value;
}
-}
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Apr 12 10:39:55 2009
@@ -37,6 +37,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
@@ -58,7 +59,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -66,8 +66,8 @@
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.LocalHBaseCluster;
@@ -76,13 +76,11 @@
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.ValueOverMaxLengthException;
import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
@@ -991,7 +989,7 @@
memcacheSize += r.memcacheSize.get();
synchronized (r.stores) {
stores += r.stores.size();
- for(Map.Entry<Integer, Store> ee: r.stores.entrySet()) {
+ for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
Store store = ee.getValue();
storefiles += store.getStorefilesCount();
try {
@@ -1573,13 +1571,15 @@
return getRegion(regionName).getRegionInfo();
}
- public Cell[] get(final byte [] regionName, final byte [] row,
+ public Cell [] get(final byte [] regionName, final byte [] row,
final byte [] column, final long timestamp, final int numVersions)
throws IOException {
checkOpen();
requestCount.incrementAndGet();
try {
- return getRegion(regionName).get(row, column, timestamp, numVersions);
+ List<KeyValue> results =
+ getRegion(regionName).get(row, column, timestamp, numVersions);
+ return Cell.createSingleCellArray(results);
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@@ -1593,16 +1593,14 @@
requestCount.incrementAndGet();
try {
// convert the columns array into a set so it's easy to check later.
- Set<byte []> columnSet = null;
+ NavigableSet<byte []> columnSet = null;
if (columns != null) {
columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
columnSet.addAll(Arrays.asList(columns));
}
-
HRegion region = getRegion(regionName);
HbaseMapWritable<byte [], Cell> result =
- region.getFull(row, columnSet,
- ts, numVersions, getLockFromId(lockId));
+ region.getFull(row, columnSet, ts, numVersions, getLockFromId(lockId));
if (result == null || result.isEmpty())
return null;
return new RowResult(row, result);
@@ -1632,9 +1630,9 @@
return rrs.length == 0 ? null : rrs[0];
}
- public RowResult[] next(final long scannerId, int nbRows) throws IOException {
+ public RowResult [] next(final long scannerId, int nbRows) throws IOException {
checkOpen();
- ArrayList<RowResult> resultSets = new ArrayList<RowResult>();
+ List<List<KeyValue>> results = new ArrayList<List<KeyValue>>();
try {
String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName);
@@ -1642,21 +1640,19 @@
throw new UnknownScannerException("Name: " + scannerName);
}
this.leases.renewLease(scannerName);
- for(int i = 0; i < nbRows; i++) {
+ for (int i = 0; i < nbRows; i++) {
requestCount.incrementAndGet();
// Collect values to be returned here
- HbaseMapWritable<byte [], Cell> values
- = new HbaseMapWritable<byte [], Cell>();
- HStoreKey key = new HStoreKey();
- while (s.next(key, values)) {
- if (values.size() > 0) {
+ List<KeyValue> values = new ArrayList<KeyValue>();
+ while (s.next(values)) {
+ if (!values.isEmpty()) {
// Row has something in it. Return the value.
- resultSets.add(new RowResult(key.getRow(), values));
+ results.add(values);
break;
}
}
}
- return resultSets.toArray(new RowResult[resultSets.size()]);
+ return RowResult.createRowResultArray(results);
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@@ -1670,7 +1666,6 @@
checkOpen();
this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
- validateValuesLength(b, region);
try {
cacheFlusher.reclaimMemcacheMemory();
region.batchUpdate(b, getLockFromId(b.getRowLock()));
@@ -1689,7 +1684,6 @@
Integer[] locks = new Integer[b.length];
for (i = 0; i < b.length; i++) {
this.requestCount.incrementAndGet();
- validateValuesLength(b[i], region);
locks[i] = getLockFromId(b[i].getRowLock());
region.batchUpdate(b[i], locks[i]);
}
@@ -1711,7 +1705,6 @@
checkOpen();
this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
- validateValuesLength(b, region);
try {
cacheFlusher.reclaimMemcacheMemory();
return region.checkAndSave(b,
@@ -1720,34 +1713,7 @@
throw convertThrowableToIOE(cleanup(t));
}
}
-
-
- /**
- * Utility method to verify values length
- * @param batchUpdate The update to verify
- * @throws IOException Thrown if a value is too long
- */
- private void validateValuesLength(BatchUpdate batchUpdate,
- HRegion region) throws IOException {
- HTableDescriptor desc = region.getTableDesc();
- for (Iterator<BatchOperation> iter =
- batchUpdate.iterator(); iter.hasNext();) {
- BatchOperation operation = iter.next();
- if (operation.getValue() != null) {
- HColumnDescriptor fam =
- desc.getFamily(HStoreKey.getFamily(operation.getColumn()));
- if (fam != null) {
- int maxLength = fam.getMaxValueLength();
- if (operation.getValue().length > maxLength) {
- throw new ValueOverMaxLengthException("Value in column "
- + Bytes.toString(operation.getColumn()) + " is too long. "
- + operation.getValue().length + " instead of " + maxLength);
- }
- }
- }
- }
- }
-
+
//
// remote scanner interface
//
@@ -2132,8 +2098,7 @@
HRegion region = null;
this.lock.readLock().lock();
try {
- Integer key = Integer.valueOf(Bytes.hashCode(regionName));
- region = onlineRegions.get(key);
+ region = onlineRegions.get(Integer.valueOf(Bytes.hashCode(regionName)));
if (region == null) {
throw new NotServingRegionException(regionName);
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Sun Apr 12 10:39:55 2009
@@ -21,9 +21,9 @@
import java.io.Closeable;
import java.io.IOException;
-import java.util.SortedMap;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
/**
* Internal scanners differ from client-side scanners in that they operate on
@@ -44,13 +44,11 @@
* Grab the next row's worth of values. The scanner will return the most
* recent data value for each row that is not newer than the target time
* passed when the scanner was created.
- * @param key will contain the row and timestamp upon return
- * @param results will contain an entry for each column family member and its
- * value
+ * @param results
* @return true if data was returned
* @throws IOException
*/
- public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ public boolean next(List<KeyValue> results)
throws IOException;
/**