You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ji...@apache.org on 2008/08/13 02:07:32 UTC
svn commit: r685391 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apache/hadoop/hbase/master/
src/java/org/apache/hadoop/hbase/regionserver/ sr...
Author: jimk
Date: Tue Aug 12 17:07:29 2008
New Revision: 685391
URL: http://svn.apache.org/viewvc?rev=685391&view=rev
Log:
HBASE-798 Provide Client API to explicitly lock and unlock rows (Jonathan Gray via Jim Kellerman)
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Aug 12 17:07:29 2008
@@ -32,6 +32,8 @@
NEW FEATURES
HBASE-787 Postgresql to HBase table replication example (Tim Sell via Stack)
+ HBASE-798 Provide Client API to explicitly lock and unlock rows (Jonathan
+ Gray via Jim Kellerman)
OPTIMIZATIONS
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java Tue Aug 12 17:07:29 2008
@@ -373,7 +373,7 @@
b.delete(COL_STARTCODE);
b.delete(COL_SPLITA);
b.delete(COL_SPLITB);
- root.batchUpdate(b);
+ root.batchUpdate(b,null);
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + regionsToDelete[r]);
@@ -383,7 +383,7 @@
newInfo.setOffline(true);
BatchUpdate b = new BatchUpdate(newRegion.getRegionName());
b.put(COL_REGIONINFO, Writables.getBytes(newInfo));
- root.batchUpdate(b);
+ root.batchUpdate(b,null);
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + newRegion.getRegionName());
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 12 17:07:29 2008
@@ -640,11 +640,32 @@
public RowResult getRow(final byte [] row, final byte [][] columns,
final long ts)
throws IOException {
+ return getRow(row,columns,ts,null);
+ }
+
+ /**
+ * Get selected columns for the specified row at a specified timestamp
+ * using existing row lock.
+ *
+ * @param row row key
+ * @param columns Array of column names and families you want to retrieve.
+ * @param ts timestamp
+ * @param rl row lock
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final byte [] row, final byte [][] columns,
+ final long ts, final RowLock rl)
+ throws IOException {
return connection.getRegionServerWithRetries(
new ServerCallable<RowResult>(connection, tableName, row) {
public RowResult call() throws IOException {
+ long lockId = -1L;
+ if(rl != null) {
+ lockId = rl.getLockId();
+ }
return server.getRow(location.getRegionInfo().getRegionName(), row,
- columns, ts);
+ columns, ts, lockId);
}
}
);
@@ -1104,15 +1125,35 @@
*/
public void deleteAll(final byte [] row, final byte [] column, final long ts)
throws IOException {
+ deleteAll(row,column,ts,null);
+ }
+
+ /**
+ * Delete all cells that match the passed row and column and whose
+ * timestamp is equal-to or older than the passed timestamp, using an
+ * existing row lock.
+ * @param row Row to update
+ * @param column name of column whose value is to be deleted
+ * @param ts Delete all cells of the same timestamp or older.
+ * @param rl Existing row lock
+ * @throws IOException
+ */
+ public void deleteAll(final byte [] row, final byte [] column, final long ts,
+ final RowLock rl)
+ throws IOException {
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row) {
public Boolean call() throws IOException {
+ long lockId = -1L;
+ if(rl != null) {
+ lockId = rl.getLockId();
+ }
if (column != null) {
this.server.deleteAll(location.getRegionInfo().getRegionName(),
- row, column, ts);
+ row, column, ts, lockId);
} else {
this.server.deleteAll(location.getRegionInfo().getRegionName(),
- row, ts);
+ row, ts, lockId);
}
return null;
}
@@ -1161,11 +1202,31 @@
public void deleteFamily(final byte [] row, final byte [] family,
final long timestamp)
throws IOException {
+ deleteFamily(row,family,timestamp,null);
+ }
+
+ /**
+ * Delete all cells for a row with matching column family with timestamps
+ * less than or equal to <i>timestamp</i>, using existing row lock.
+ *
+ * @param row The row to operate on
+ * @param family The column family to match
+ * @param timestamp Timestamp to match
+ * @param rl Existing row lock
+ * @throws IOException
+ */
+ public void deleteFamily(final byte [] row, final byte [] family,
+ final long timestamp, final RowLock rl)
+ throws IOException {
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, row) {
public Boolean call() throws IOException {
+ long lockId = -1L;
+ if(rl != null) {
+ lockId = rl.getLockId();
+ }
server.deleteFamily(location.getRegionInfo().getRegionName(), row,
- family, timestamp);
+ family, timestamp, lockId);
return null;
}
}
@@ -1179,11 +1240,27 @@
*/
public synchronized void commit(final BatchUpdate batchUpdate)
throws IOException {
+ commit(batchUpdate,null);
+ }
+
+ /**
+ * Commit a BatchUpdate to the table using existing row lock.
+ * @param batchUpdate
+ * @param rl Existing row lock
+ * @throws IOException
+ */
+ public synchronized void commit(final BatchUpdate batchUpdate,
+ final RowLock rl)
+ throws IOException {
connection.getRegionServerWithRetries(
new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
public Boolean call() throws IOException {
+ long lockId = -1L;
+ if(rl != null) {
+ lockId = rl.getLockId();
+ }
server.batchUpdate(location.getRegionInfo().getRegionName(),
- batchUpdate);
+ batchUpdate, lockId);
return null;
}
}
@@ -1198,7 +1275,45 @@
public synchronized void commit(final List<BatchUpdate> batchUpdates)
throws IOException {
for (BatchUpdate batchUpdate : batchUpdates)
- commit(batchUpdate);
+ commit(batchUpdate,null);
+ }
+
+ /**
+ * Obtain a row lock
+ * @param row The row to lock
+ * @return rowLock RowLock containing row and lock id
+ * @throws IOException
+ */
+ public RowLock lockRow(final byte [] row)
+ throws IOException {
+ return connection.getRegionServerWithRetries(
+ new ServerCallable<RowLock>(connection, tableName, row) {
+ public RowLock call() throws IOException {
+ long lockId =
+ server.lockRow(location.getRegionInfo().getRegionName(), row);
+ RowLock rowLock = new RowLock(row,lockId);
+ return rowLock;
+ }
+ }
+ );
+ }
+
+ /**
+ * Release a row lock
+ * @param rl The row lock to release
+ * @throws IOException
+ */
+ public void unlockRow(final RowLock rl)
+ throws IOException {
+ connection.getRegionServerWithRetries(
+ new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
+ public Boolean call() throws IOException {
+ server.unlockRow(location.getRegionInfo().getRegionName(),
+ rl.getLockId());
+ return null;
+ }
+ }
+ );
}
/**
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Aug 12 17:07:29 2008
@@ -111,11 +111,12 @@
*
* @param regionName region name
* @param row row key
+ * @param lockId lock id
* @return map of values
* @throws IOException
*/
public RowResult getRow(final byte [] regionName, final byte [] row,
- final byte[][] columns, final long ts)
+ final byte[][] columns, final long ts, final long lockId)
throws IOException;
/**
@@ -123,9 +124,11 @@
*
* @param regionName name of the region to update
* @param b BatchUpdate
+ * @param lockId lock id
* @throws IOException
*/
- public void batchUpdate(final byte [] regionName, final BatchUpdate b)
+ public void batchUpdate(final byte [] regionName, final BatchUpdate b,
+ final long lockId)
throws IOException;
/**
@@ -136,10 +139,11 @@
* @param row row key
* @param column column key
* @param timestamp Delete all entries that have this timestamp or older
+ * @param lockId lock id
* @throws IOException
*/
public void deleteAll(byte [] regionName, byte [] row, byte [] column,
- long timestamp)
+ long timestamp, long lockId)
throws IOException;
/**
@@ -149,9 +153,11 @@
* @param regionName region name
* @param row row key
* @param timestamp Delete all entries that have this timestamp or older
+ * @param lockId lock id
* @throws IOException
*/
- public void deleteAll(byte [] regionName, byte [] row, long timestamp)
+ public void deleteAll(byte [] regionName, byte [] row, long timestamp,
+ long lockId)
throws IOException;
/**
@@ -162,9 +168,10 @@
* @param row The row to operate on
* @param family The column family to match
* @param timestamp Timestamp to match
+ * @param lockId lock id
*/
public void deleteFamily(byte [] regionName, byte [] row, byte [] family,
- long timestamp)
+ long timestamp, long lockId)
throws IOException;
@@ -207,4 +214,24 @@
* @throws IOException
*/
public void close(long scannerId) throws IOException;
+
+ /**
+ * Opens a remote row lock.
+ *
+ * @param regionName name of region
+ * @param row row to lock
+ * @return lockId lock identifier
+ * @throws IOException
+ */
+ public long lockRow(final byte [] regionName, final byte [] row)
+ throws IOException;
+
+ /**
+ * Releases a remote row lock.
+ *
+ * @param lockId the lock id returned by lockRow
+ * @throws IOException
+ */
+ public void unlockRow(final byte [] regionName, final long lockId)
+ throws IOException;
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Tue Aug 12 17:07:29 2008
@@ -332,7 +332,7 @@
BatchUpdate b = new BatchUpdate(parent);
b.delete(splitColumn);
- srvr.batchUpdate(metaRegionName, b);
+ srvr.batchUpdate(metaRegionName, b, -1L);
return result;
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java Tue Aug 12 17:07:29 2008
@@ -91,7 +91,7 @@
updateRegionInfo(b, i);
b.delete(COL_SERVER);
b.delete(COL_STARTCODE);
- server.batchUpdate(m.getRegionName(), b);
+ server.batchUpdate(m.getRegionName(), b, -1L);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.getRegionNameAsString());
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java Tue Aug 12 17:07:29 2008
@@ -52,7 +52,7 @@
throws IOException {
BatchUpdate b = new BatchUpdate(i.getRegionName());
b.put(COL_REGIONINFO, Writables.getBytes(i));
- server.batchUpdate(regionName, b);
+ server.batchUpdate(regionName, b, -1L);
if (LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + i.getRegionNameAsString());
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java Tue Aug 12 17:07:29 2008
@@ -52,7 +52,7 @@
throws IOException {
BatchUpdate b = new BatchUpdate(i.getRegionName());
b.put(COL_REGIONINFO, Writables.getBytes(i));
- server.batchUpdate(regionName, b);
+ server.batchUpdate(regionName, b, -1L);
LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString());
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Tue Aug 12 17:07:29 2008
@@ -83,7 +83,7 @@
BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString()));
b.put(COL_STARTCODE, startCode);
- server.batchUpdate(metaRegionName, b);
+ server.batchUpdate(metaRegionName, b, -1L);
if (!this.historian.isOnline()) {
// This is safest place to do the onlining of the historian in
// the master. When we get to here, we know there is a .META.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Aug 12 17:07:29 2008
@@ -560,7 +560,7 @@
byte [] regionName = region.getRegionName();
BatchUpdate b = new BatchUpdate(regionName);
b.put(COL_REGIONINFO, Writables.getBytes(info));
- server.batchUpdate(metaRegionName, b);
+ server.batchUpdate(metaRegionName, b, -1L);
// 4. Close the new region to flush it to disk. Close its log file too.
region.close();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Aug 12 17:07:29 2008
@@ -1,4 +1,4 @@
-/**
+ /**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -1170,7 +1170,7 @@
* @throws IOException
*/
public Map<byte [], Cell> getFull(final byte [] row,
- final Set<byte []> columns, final long ts)
+ final Set<byte []> columns, final long ts, final Integer lockid)
throws IOException {
// Check columns passed
if (columns != null) {
@@ -1179,7 +1179,7 @@
}
}
HStoreKey key = new HStoreKey(row, ts);
- Integer lid = obtainRowLock(row);
+ Integer lid = getLock(lockid,row);
HashSet<HStore> storeSet = new HashSet<HStore>();
try {
TreeMap<byte [], Cell> result =
@@ -1215,7 +1215,7 @@
return result;
} finally {
- releaseRowLock(lid);
+ if(lockid == null) releaseRowLock(lid);
}
}
@@ -1347,7 +1347,7 @@
* @param b
* @throws IOException
*/
- public void batchUpdate(BatchUpdate b)
+ public void batchUpdate(BatchUpdate b, Integer lockid)
throws IOException {
checkReadOnly();
@@ -1363,7 +1363,8 @@
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
byte [] row = b.getRow();
- Integer lid = obtainRowLock(row);
+ // If we did not pass an existing row lock, obtain a new one
+ Integer lid = getLock(lockid,row);
long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
System.currentTimeMillis() : b.getTimestamp();
try {
@@ -1408,7 +1409,7 @@
this.targetColumns.remove(Long.valueOf(lid));
throw e;
} finally {
- releaseRowLock(lid);
+ if(lockid == null) releaseRowLock(lid);
}
}
@@ -1458,17 +1459,19 @@
* @param row
* @param column
* @param ts Delete all entries that have this timestamp or older
+ * @param lockid Row lock
* @throws IOException
*/
- public void deleteAll(final byte [] row, final byte [] column, final long ts)
+ public void deleteAll(final byte [] row, final byte [] column, final long ts,
+ final Integer lockid)
throws IOException {
checkColumn(column);
checkReadOnly();
- Integer lid = obtainRowLock(row);
+ Integer lid = getLock(lockid,row);
try {
deleteMultiple(row, column, ts, ALL_VERSIONS);
} finally {
- releaseRowLock(lid);
+ if(lockid == null) releaseRowLock(lid);
}
}
@@ -1476,12 +1479,14 @@
* Delete all cells of the same age as the passed timestamp or older.
* @param row
* @param ts Delete all entries that have this timestamp or older
+ * @param lockid Row lock
* @throws IOException
*/
- public void deleteAll(final byte [] row, final long ts)
+ public void deleteAll(final byte [] row, final long ts,
+ final Integer lockid)
throws IOException {
checkReadOnly();
- Integer lid = obtainRowLock(row);
+ Integer lid = getLock(lockid,row);
try {
for (HStore store : stores.values()){
List<HStoreKey> keys = store.getKeys(new HStoreKey(row, ts),
@@ -1493,7 +1498,7 @@
update(edits);
}
} finally {
- releaseRowLock(lid);
+ if(lockid == null) releaseRowLock(lid);
}
}
@@ -1504,12 +1509,14 @@
* @param row The row to operate on
* @param family The column family to match
* @param timestamp Timestamp to match
+ * @param lockid Row lock
* @throws IOException
*/
- public void deleteFamily(byte [] row, byte [] family, long timestamp)
+ public void deleteFamily(byte [] row, byte [] family, long timestamp,
+ final Integer lockid)
throws IOException{
checkReadOnly();
- Integer lid = obtainRowLock(row);
+ Integer lid = getLock(lockid,row);
try {
// find the HStore for the column family
HStore store = getStore(family);
@@ -1522,7 +1529,7 @@
}
update(edits);
} finally {
- releaseRowLock(lid);
+ if(lockid == null) releaseRowLock(lid);
}
}
@@ -1552,7 +1559,7 @@
update(edits);
}
}
-
+
/**
* @throws IOException Throws exception if region is in read-only mode.
*/
@@ -1778,6 +1785,41 @@
}
}
+ /**
+ * See if row is currently locked.
+ * @param lockid
+ * @return boolean
+ */
+ private boolean isRowLocked(final Integer lockid) {
+ synchronized (locksToRows) {
+ if(locksToRows.containsKey(lockid)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Returns existing row lock if found, otherwise
+ * obtains a new row lock and returns it.
+ * @param lockid
+ * @return lockid
+ */
+ private Integer getLock(Integer lockid, byte [] row)
+ throws IOException {
+ Integer lid = null;
+ if(lockid == null) {
+ lid = obtainRowLock(row);
+ } else {
+ if(!isRowLocked(lockid)) {
+ throw new IOException("Invalid row lock");
+ }
+ lid = lockid;
+ }
+ return lid;
+ }
+
private void waitOnRowLocks() {
synchronized (locksToRows) {
while (this.locksToRows.size() > 0) {
@@ -2134,7 +2176,8 @@
public static void removeRegionFromMETA(final HRegionInterface srvr,
final byte [] metaRegionName, final byte [] regionName)
throws IOException {
- srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
+ srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP,
+ (long)-1L);
}
/**
@@ -2155,7 +2198,7 @@
b.delete(COL_STARTCODE);
// If carrying splits, they'll be in place when we show up on new
// server.
- srvr.batchUpdate(metaRegionName, b);
+ srvr.batchUpdate(metaRegionName, b, (long)-1L);
}
/**
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Aug 12 17:07:29 2008
@@ -69,6 +69,7 @@
import org.apache.hadoop.hbase.RegionHistorian;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchOperation;
@@ -1048,7 +1049,7 @@
/** {@inheritDoc} */
public RowResult getRow(final byte [] regionName, final byte [] row,
- final byte [][] columns, final long ts)
+ final byte [][] columns, final long ts, final long lockId)
throws IOException {
checkOpen();
requestCount.incrementAndGet();
@@ -1061,7 +1062,8 @@
}
HRegion region = getRegion(regionName);
- Map<byte [], Cell> map = region.getFull(row, columnSet, ts);
+ Map<byte [], Cell> map = region.getFull(row, columnSet, ts,
+ getLockFromId(lockId));
HbaseMapWritable<byte [], Cell> result =
new HbaseMapWritable<byte [], Cell>();
result.putAll(map);
@@ -1126,7 +1128,7 @@
}
/** {@inheritDoc} */
- public void batchUpdate(final byte [] regionName, BatchUpdate b)
+ public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
throws IOException {
checkOpen();
this.requestCount.incrementAndGet();
@@ -1134,7 +1136,7 @@
validateValuesLength(b, region);
try {
cacheFlusher.reclaimMemcacheMemory();
- region.batchUpdate(b);
+ region.batchUpdate(b, getLockFromId(lockId));
} catch (OutOfMemoryError error) {
abort();
LOG.fatal("Ran out of memory", error);
@@ -1239,7 +1241,7 @@
}
Map<String, InternalScanner> scanners =
- Collections.synchronizedMap(new HashMap<String, InternalScanner>());
+ new ConcurrentHashMap<String, InternalScanner>();
/**
* Instantiated as a scanner lease.
@@ -1275,26 +1277,157 @@
/** {@inheritDoc} */
public void deleteAll(final byte [] regionName, final byte [] row,
- final byte [] column, final long timestamp)
+ final byte [] column, final long timestamp, final long lockId)
throws IOException {
HRegion region = getRegion(regionName);
- region.deleteAll(row, column, timestamp);
+ region.deleteAll(row, column, timestamp, getLockFromId(lockId));
}
/** {@inheritDoc} */
public void deleteAll(final byte [] regionName, final byte [] row,
- final long timestamp)
+ final long timestamp, final long lockId)
throws IOException {
HRegion region = getRegion(regionName);
- region.deleteAll(row, timestamp);
+ region.deleteAll(row, timestamp, getLockFromId(lockId));
}
/** {@inheritDoc} */
public void deleteFamily(byte [] regionName, byte [] row, byte [] family,
- long timestamp) throws IOException{
- getRegion(regionName).deleteFamily(row, family, timestamp);
+ long timestamp, final long lockId)
+ throws IOException{
+ getRegion(regionName).deleteFamily(row, family, timestamp,
+ getLockFromId(lockId));
}
+ /** {@inheritDoc} */
+ public long lockRow(byte [] regionName, byte [] row)
+ throws IOException {
+ checkOpen();
+ NullPointerException npe = null;
+ if(regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if(row == null) {
+ npe = new NullPointerException("row to lock is null");
+ }
+ if(npe != null) {
+ IOException io = new IOException("Invalid arguments to lockRow");
+ io.initCause(npe);
+ throw io;
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ Integer r = region.obtainRowLock(row);
+ long lockId = addRowLock(r,region);
+ LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+ return lockId;
+ } catch (IOException e) {
+ LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")",
+ RemoteExceptionHandler.checkIOException(e));
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException {
+ long lockId = -1L;
+ lockId = rand.nextLong();
+ String lockName = String.valueOf(lockId);
+ synchronized(rowlocks) {
+ rowlocks.put(lockName, r);
+ }
+ this.leases.
+ createLease(lockName, new RowLockListener(lockName, region));
+ return lockId;
+ }
+
+ /**
+ * Method to get the Integer lock identifier used internally
+ * from the long lock identifier used by the client.
+ * @param lockId long row lock identifier from client
+ * @return intId Integer row lock used internally in HRegion
+ * @throws IOException Thrown if this is not a valid client lock id.
+ */
+ private Integer getLockFromId(long lockId)
+ throws IOException {
+ if(lockId == -1L) {
+ return null;
+ }
+ String lockName = String.valueOf(lockId);
+ Integer rl = null;
+ synchronized(rowlocks) {
+ rl = rowlocks.get(lockName);
+ }
+ if(rl == null) {
+ throw new IOException("Invalid row lock");
+ }
+ this.leases.renewLease(lockName);
+ return rl;
+ }
+
+ /** {@inheritDoc} */
+ public void unlockRow(byte [] regionName, long lockId)
+ throws IOException {
+ checkOpen();
+ NullPointerException npe = null;
+ if(regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if(lockId == -1L) {
+ npe = new NullPointerException("lockId is null");
+ }
+ if(npe != null) {
+ IOException io = new IOException("Invalid arguments to unlockRow");
+ io.initCause(npe);
+ throw io;
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ String lockName = String.valueOf(lockId);
+ Integer r = null;
+ synchronized(rowlocks) {
+ r = rowlocks.remove(lockName);
+ }
+ if(r == null) {
+ throw new UnknownRowLockException(lockName);
+ }
+ region.releaseRowLock(r);
+ this.leases.cancelLease(lockName);
+ LOG.debug("Row lock " + lockId + " has been explicitly released by client");
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ Map<String, Integer> rowlocks =
+ new ConcurrentHashMap<String, Integer>();
+
+ /**
+ * Instantiated as a row lock lease.
+ * If the lease times out, the row lock is released
+ */
+ private class RowLockListener implements LeaseListener {
+ private final String lockName;
+ private final HRegion region;
+
+ RowLockListener(final String lockName, final HRegion region) {
+ this.lockName = lockName;
+ this.region = region;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Row Lock " + this.lockName + " lease expired");
+ Integer r = null;
+ synchronized(rowlocks) {
+ r = rowlocks.remove(this.lockName);
+ }
+ if(r != null) {
+ region.releaseRowLock(r);
+ }
+ }
+ }
/**
* @return Info on this server.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java Tue Aug 12 17:07:29 2008
@@ -308,7 +308,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Removing region: " + regioninfo + " from " + meta);
}
- meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis());
+ meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null);
}
/*
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java Tue Aug 12 17:07:29 2008
@@ -407,7 +407,7 @@
}
BatchUpdate b = new BatchUpdate(hri.getRegionName());
b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri));
- r.batchUpdate(b);
+ r.batchUpdate(b, null);
if (LOG.isDebugEnabled()) {
HRegionInfo h = Writables.getHRegionInfoOrNull(
r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue());
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java Tue Aug 12 17:07:29 2008
@@ -2050,7 +2050,8 @@
public static void removeRegionFromMETA(final HRegionInterface srvr,
final byte [] metaRegionName, final byte [] regionName)
throws IOException {
- srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
+ srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP,
+ -1L);
}
/**
@@ -2071,7 +2072,7 @@
b.delete(COL_STARTCODE);
// If carrying splits, they'll be in place when we show up on new
// server.
- srvr.batchUpdate(metaRegionName, b);
+ srvr.batchUpdate(metaRegionName, b, -1L);
}
/**