You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/01/17 20:38:13 UTC
svn commit: r1232551 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/replication/regionserver/
test/java/org/apache/hadoop/hbase/client/ test/java/org/ap...
Author: larsh
Date: Tue Jan 17 19:38:13 2012
New Revision: 1232551
URL: http://svn.apache.org/viewvc?rev=1232551&view=rev
Log:
HBASE-5203 Group atomic put/delete operation into a single WALEdit to handle region server failures. (Lars H)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Delete.java Tue Jan 17 19:38:13 2012
@@ -138,6 +138,35 @@ public class Delete extends Mutation
}
/**
+ * Advanced use only.
+ * Add an existing delete marker to this Delete object.
+ * @param kv An existing KeyValue of type "delete".
+ * @return this for invocation chaining
+ * @throws IOException
+ */
+ public Delete addDeleteMarker(KeyValue kv) throws IOException {
+ if (!kv.isDelete()) {
+ throw new IOException("The recently added KeyValue is not of type "
+ + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
+ }
+ if (Bytes.compareTo(this.row, 0, row.length, kv.getBuffer(),
+ kv.getRowOffset(), kv.getRowLength()) != 0) {
+ throw new IOException("The row in the recently added KeyValue "
+ + Bytes.toStringBinary(kv.getBuffer(), kv.getRowOffset(),
+ kv.getRowLength()) + " doesn't match the original one "
+ + Bytes.toStringBinary(this.row));
+ }
+ byte [] family = kv.getFamily();
+ List<KeyValue> list = familyMap.get(family);
+ if (list == null) {
+ list = new ArrayList<KeyValue>();
+ }
+ list.add(kv);
+ familyMap.put(family, list);
+ return this;
+ }
+
+ /**
* Delete all versions of all columns of the specified family.
* <p>
* Overrides previous calls to deleteColumn and deleteColumns for the
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jan 17 19:38:13 2012
@@ -1686,7 +1686,7 @@ public class HRegion implements HeapSize
try {
// All edits for the given row (across all column families) must happen atomically.
prepareDelete(delete);
- internalDelete(delete, delete.getClusterId(), writeToWAL, null, null);
+ internalDelete(delete, delete.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -1707,26 +1707,77 @@ public class HRegion implements HeapSize
delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId);
delete.setWriteToWAL(writeToWAL);
- internalDelete(delete, clusterId, writeToWAL, null, null);
+ internalDelete(delete, clusterId, writeToWAL);
+ }
+
+ /**
+ * Setup a Delete object with correct timestamps.
+ * Caller should the row and region locks.
+ * @param delete
+ * @param now
+ * @throws IOException
+ */
+ private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
+ throws IOException {
+ Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
+ for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
+
+ byte[] family = e.getKey();
+ List<KeyValue> kvs = e.getValue();
+ Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+ for (KeyValue kv: kvs) {
+ // Check if time is LATEST, change to time of most recent addition if so
+ // This is expensive.
+ if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+ byte[] qual = kv.getQualifier();
+ if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
+ Integer count = kvCount.get(qual);
+ if (count == null) {
+ kvCount.put(qual, 1);
+ } else {
+ kvCount.put(qual, count + 1);
+ }
+ count = kvCount.get(qual);
+
+ Get get = new Get(kv.getRow());
+ get.setMaxVersions(count);
+ get.addColumn(family, qual);
+
+ List<KeyValue> result = get(get, false);
+
+ if (result.size() < count) {
+ // Nothing to delete
+ kv.updateLatestStamp(byteNow);
+ continue;
+ }
+ if (result.size() > count) {
+ throw new RuntimeException("Unexpected size: " + result.size());
+ }
+ KeyValue getkv = result.get(count - 1);
+ Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+ getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+ } else {
+ kv.updateLatestStamp(byteNow);
+ }
+ }
+ }
}
/**
* @param delete The Delete command
- * @param familyMap map of family to edits for the given family.
+ * @param clusterId UUID of the originating cluster (for replication).
* @param writeToWAL
- * @param writeEntry Optional mvcc write point to use
- * @param walEdit Optional walEdit to use. A non-null walEdit indicates
- * that the coprocessor hooks are run by the caller
* @throws IOException
*/
private void internalDelete(Delete delete, UUID clusterId,
- boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry,
- WALEdit walEdit) throws IOException {
+ boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
- WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
+ WALEdit walEdit = new WALEdit();
/* Run coprocessor pre hook outside of locks to avoid deadlock */
- if (coprocessorHost != null && walEdit == null) {
- if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) {
+ if (coprocessorHost != null) {
+ if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
return;
}
}
@@ -1737,49 +1788,7 @@ public class HRegion implements HeapSize
updatesLock.readLock().lock();
try {
- for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
-
- byte[] family = e.getKey();
- List<KeyValue> kvs = e.getValue();
- Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-
- for (KeyValue kv: kvs) {
- // Check if time is LATEST, change to time of most recent addition if so
- // This is expensive.
- if (kv.isLatestTimestamp() && kv.isDeleteType()) {
- byte[] qual = kv.getQualifier();
- if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
-
- Integer count = kvCount.get(qual);
- if (count == null) {
- kvCount.put(qual, 1);
- } else {
- kvCount.put(qual, count + 1);
- }
- count = kvCount.get(qual);
-
- Get get = new Get(kv.getRow());
- get.setMaxVersions(count);
- get.addColumn(family, qual);
-
- List<KeyValue> result = get(get, false);
-
- if (result.size() < count) {
- // Nothing to delete
- kv.updateLatestStamp(byteNow);
- continue;
- }
- if (result.size() > count) {
- throw new RuntimeException("Unexpected size: " + result.size());
- }
- KeyValue getkv = result.get(count - 1);
- Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
- getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
- } else {
- kv.updateLatestStamp(byteNow);
- }
- }
- }
+ prepareDeleteTimestamps(delete, byteNow);
if (writeToWAL) {
// write/sync to WAL should happen before we touch memstore.
@@ -1790,21 +1799,21 @@ public class HRegion implements HeapSize
//
// bunch up all edits across all column families into a
// single WALEdit.
- addFamilyMapToWALEdit(familyMap, localWalEdit);
+ addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
- localWalEdit, clusterId, now, this.htableDescriptor);
+ walEdit, clusterId, now, this.htableDescriptor);
}
// Now make changes to the memstore.
- long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
+ long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
this.updatesLock.readLock().unlock();
}
// do after lock
- if (coprocessorHost != null && walEdit == null) {
- coprocessorHost.postDelete(delete, localWalEdit, writeToWAL);
+ if (coprocessorHost != null) {
+ coprocessorHost.postDelete(delete, walEdit, writeToWAL);
}
final long after = EnvironmentEdgeManager.currentTimeMillis();
final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
@@ -1876,7 +1885,7 @@ public class HRegion implements HeapSize
try {
// All edits for the given row (across all column families) must happen atomically.
- internalPut(put, put.getClusterId(), writeToWAL, null, null);
+ internalPut(put, put.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@@ -2305,13 +2314,11 @@ public class HRegion implements HeapSize
// originating cluster. A slave cluster receives the result as a Put
// or Delete
if (isPut) {
- internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL,
- null, null);
+ internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
} else {
Delete d = (Delete)w;
prepareDelete(d);
- internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null,
- null);
+ internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
}
return true;
}
@@ -2406,26 +2413,23 @@ public class HRegion implements HeapSize
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
p.setWriteToWAL(true);
- this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null);
+ this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
}
/**
* Add updates first to the hlog (if writeToWal) and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param put The Put command
+ * @param clusterId UUID of the originating cluster (for replication).
* @param writeToWAL if true, then we should write to the log
- * @param writeEntry Optional mvcc write point to use
- * @param walEdit Optional walEdit to use. A non-null walEdit indicates
- * that the coprocessor hooks are run by the caller
* @throws IOException
*/
- private void internalPut(Put put, UUID clusterId, boolean writeToWAL,
- MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException {
+ private void internalPut(Put put, UUID clusterId, boolean writeToWAL) throws IOException {
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
- WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
+ WALEdit walEdit = new WALEdit();
/* run pre put hook outside of lock to avoid deadlock */
- if (coprocessorHost != null && walEdit == null) {
- if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) {
+ if (coprocessorHost != null) {
+ if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
return;
}
}
@@ -2445,19 +2449,19 @@ public class HRegion implements HeapSize
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
if (writeToWAL) {
- addFamilyMapToWALEdit(familyMap, localWalEdit);
+ addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
- localWalEdit, clusterId, now, this.htableDescriptor);
+ walEdit, clusterId, now, this.htableDescriptor);
}
- long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
+ long addedSize = applyFamilyMapToMemstore(familyMap, null);
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
this.updatesLock.readLock().unlock();
}
- if (coprocessorHost != null && walEdit == null) {
- coprocessorHost.postPut(put, localWalEdit, writeToWAL);
+ if (coprocessorHost != null) {
+ coprocessorHost.postPut(put, walEdit, writeToWAL);
}
// do after lock
@@ -4140,92 +4144,107 @@ public class HRegion implements HeapSize
return results;
}
- public int mutateRow(RowMutation rm,
+ public void mutateRow(RowMutation rm,
Integer lockid) throws IOException {
+ boolean flush = false;
startRegionOperation();
- List<WALEdit> walEdits = new ArrayList<WALEdit>(rm.getMutations().size());
+ Integer lid = null;
+ try {
+ // 1. run all pre-hooks before the atomic operation
+ // if any pre hook indicates "bypass", bypass the entire operation
- // 1. run all pre-hooks before the atomic operation
- // if any pre hook indicates "bypass", bypass the entire operation
- // Note that this requires creating the WALEdits here and passing
- // them to the actual Put/Delete operations.
- for (Mutation m : rm.getMutations()) {
+ // one WALEdit is used for all edits.
WALEdit walEdit = new WALEdit();
- walEdits.add(walEdit);
- if (coprocessorHost == null) {
- continue;
- }
- if (m instanceof Put) {
- if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
- // by pass everything
- return 0;
- }
- } else if (m instanceof Delete) {
- Delete d = (Delete) m;
- prepareDelete(d);
- if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
- // by pass everything
- return 0;
+ if (coprocessorHost != null) {
+ for (Mutation m : rm.getMutations()) {
+ if (m instanceof Put) {
+ if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
+ // by pass everything
+ return;
+ }
+ } else if (m instanceof Delete) {
+ Delete d = (Delete) m;
+ prepareDelete(d);
+ if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
+ // by pass everything
+ return;
+ }
+ }
}
}
- }
- // 2. acquire the row lock
- Integer lid = getLock(lockid, rm.getRow(), true);
+ // 2. acquire the row lock
+ lid = getLock(lockid, rm.getRow(), true);
- // 3. acquire the region lock
- this.updatesLock.readLock().lock();
+ // 3. acquire the region lock
+ this.updatesLock.readLock().lock();
- // 4. Get a mvcc write number
- MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
- try {
- int i = 0;
- // 5. Perform the actual mutations
- for (Mutation m : rm.getMutations()) {
- if (m instanceof Put) {
- internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID,
- m.getWriteToWAL(), w, walEdits.get(i));
- } else if (m instanceof Delete) {
- Delete d = (Delete) m;
- prepareDelete(d);
- internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(),
- w, walEdits.get(i));
- } else {
- throw new DoNotRetryIOException(
- "Action must be Put or Delete. But was: "
- + m.getClass().getName());
- }
- i++;
- }
- return i;
- } finally {
- // 6. roll mvcc forward
- mvcc.completeMemstoreInsert(w);
- // 7. release region lock
- this.updatesLock.readLock().unlock();
+ // 4. Get a mvcc write number
+ MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
+
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ byte[] byteNow = Bytes.toBytes(now);
try {
- // 8. run all coprocessor post hooks
- if (coprocessorHost != null) {
- int i = 0;
- for (Mutation m : rm.getMutations()) {
- if (m instanceof Put) {
- coprocessorHost.postPut((Put) m, walEdits.get(i),
- m.getWriteToWAL());
- } else if (m instanceof Delete) {
- coprocessorHost.postDelete((Delete) m, walEdits.get(i),
- m.getWriteToWAL());
- }
- i++;
+ // 5. Check mutations and apply edits to a single WALEdit
+ for (Mutation m : rm.getMutations()) {
+ if (m instanceof Put) {
+ Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
+ checkFamilies(familyMap.keySet());
+ checkTimestamps(familyMap, now);
+ updateKVTimestamps(familyMap.values(), byteNow);
+ } else if (m instanceof Delete) {
+ Delete d = (Delete) m;
+ prepareDelete(d);
+ prepareDeleteTimestamps(d, byteNow);
+ } else {
+ throw new DoNotRetryIOException(
+ "Action must be Put or Delete. But was: "
+ + m.getClass().getName());
+ }
+ if (m.getWriteToWAL()) {
+ addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
}
}
+
+ // 6. append/sync all edits at once
+ // TODO: Do batching as in doMiniBatchPut
+ this.log.append(regionInfo, this.htableDescriptor.getName(), walEdit,
+ HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
+
+ // 7. apply to memstore
+ long addedSize = 0;
+ for (Mutation m : rm.getMutations()) {
+ addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
+ }
+ flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
} finally {
- if (lid != null) {
- // 9. release the row lock
- releaseRowLock(lid);
+ // 8. roll mvcc forward
+ mvcc.completeMemstoreInsert(w);
+
+ // 9. release region lock
+ this.updatesLock.readLock().unlock();
+ }
+ // 10. run all coprocessor post hooks, after region lock is released
+ if (coprocessorHost != null) {
+ for (Mutation m : rm.getMutations()) {
+ if (m instanceof Put) {
+ coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
+ } else if (m instanceof Delete) {
+ coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
+ }
}
- closeRegionOperation();
}
+ } finally {
+ if (lid != null) {
+ // 11. release the row lock
+ releaseRowLock(lid);
+ }
+ if (flush) {
+ // 12. Flush cache if needed. Do it outside update lock.
+ requestFlush();
+ }
+ closeRegionOperation();
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jan 17 19:38:13 2012
@@ -23,11 +23,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -94,62 +94,39 @@ public class ReplicationSink {
// to the same table.
try {
long totalReplicated = 0;
- // Map of table => list of puts, we only want to flushCommits once per
+ // Map of table => list of Rows, we only want to flushCommits once per
// invocation of this method per table.
- Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
+ Map<byte[], List<Row>> rows = new TreeMap<byte[], List<Row>>(Bytes.BYTES_COMPARATOR);
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
+ byte[] table = entry.getKey().getTablename();
+ Put put = null;
+ Delete del = null;
+ KeyValue lastKV = null;
List<KeyValue> kvs = edit.getKeyValues();
- if (kvs.get(0).isDelete()) {
- Delete delete = new Delete(kvs.get(0).getRow(),
- kvs.get(0).getTimestamp(), null);
- delete.setClusterId(entry.getKey().getClusterId());
- for (KeyValue kv : kvs) {
- switch (Type.codeToType(kv.getType())) {
- case DeleteFamily:
- // family marker
- delete.deleteFamily(kv.getFamily(), kv.getTimestamp());
- break;
- case DeleteColumn:
- // column marker
- delete.deleteColumns(kv.getFamily(), kv.getQualifier(),
- kv.getTimestamp());
- break;
- case Delete:
- // version marker
- delete.deleteColumn(kv.getFamily(), kv.getQualifier(),
- kv.getTimestamp());
- break;
- }
- }
- delete(entry.getKey().getTablename(), delete);
- } else {
- byte[] table = entry.getKey().getTablename();
- List<Put> tableList = puts.get(table);
- if (tableList == null) {
- tableList = new ArrayList<Put>();
- puts.put(table, tableList);
- }
- // With mini-batching, we need to expect multiple rows per edit
- byte[] lastKey = kvs.get(0).getRow();
- Put put = new Put(lastKey, kvs.get(0).getTimestamp());
- put.setClusterId(entry.getKey().getClusterId());
- for (KeyValue kv : kvs) {
- byte[] key = kv.getRow();
- if (!Bytes.equals(lastKey, key)) {
- tableList.add(put);
- put = new Put(key, kv.getTimestamp());
+ for (KeyValue kv : kvs) {
+ if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
+ if (kv.isDelete()) {
+ del = new Delete(kv.getRow());
+ del.setClusterId(entry.getKey().getClusterId());
+ addToMultiMap(rows, table, del);
+ } else {
+ put = new Put(kv.getRow());
put.setClusterId(entry.getKey().getClusterId());
+ addToMultiMap(rows, table, put);
}
+ }
+ if (kv.isDelete()) {
+ del.addDeleteMarker(kv);
+ } else {
put.add(kv);
- lastKey = key;
}
- tableList.add(put);
+ lastKV = kv;
}
totalReplicated++;
}
- for(byte [] table : puts.keySet()) {
- put(table, puts.get(table));
+ for(byte [] table : rows.keySet()) {
+ batch(table, rows.get(table));
}
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
@@ -162,39 +139,40 @@ public class ReplicationSink {
}
/**
- * Do the puts and handle the pool
- * @param tableName table to insert into
- * @param puts list of puts
- * @throws IOException
+ * Simple helper to a map from key to (a list of) values
+ * TODO: Make a general utility method
+ * @param map
+ * @param key
+ * @param value
+ * @return
*/
- private void put(byte[] tableName, List<Put> puts) throws IOException {
- if (puts.isEmpty()) {
- return;
- }
- HTableInterface table = null;
- try {
- table = this.pool.getTable(tableName);
- table.put(puts);
- this.metrics.appliedOpsRate.inc(puts.size());
- } finally {
- if (table != null) {
- table.close();
- }
+ private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K key, V value) {
+ List<V> values = map.get(key);
+ if (values == null) {
+ values = new ArrayList<V>();
+ map.put(key, values);
}
+ values.add(value);
+ return values;
}
/**
- * Do the delete and handle the pool
- * @param tableName table to delete in
- * @param delete the delete to use
+ * Do the changes and handle the pool
+ * @param tableName table to insert into
+ * @param rows list of actions
* @throws IOException
*/
- private void delete(byte[] tableName, Delete delete) throws IOException {
+ private void batch(byte[] tableName, List<Row> rows) throws IOException {
+ if (rows.isEmpty()) {
+ return;
+ }
HTableInterface table = null;
try {
table = this.pool.getTable(tableName);
- table.delete(delete);
- this.metrics.appliedOpsRate.inc(1);
+ table.batch(rows);
+ this.metrics.appliedOpsRate.inc(rows.size());
+ } catch (InterruptedException ix) {
+ throw new IOException(ix);
} finally {
if (table != null) {
table.close();
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Tue Jan 17 19:38:13 2012
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertArr
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -4046,7 +4047,6 @@ public class TestFromClientSide {
Bytes.toBytes("a"), Bytes.toBytes("b")
};
RowMutation arm = new RowMutation(ROW);
- arm.add(new Delete(ROW));
Put p = new Put(ROW);
p.add(FAMILY, QUALIFIERS[0], VALUE);
arm.add(p);
@@ -4054,15 +4054,19 @@ public class TestFromClientSide {
Get g = new Get(ROW);
Result r = t.get(g);
- // delete was first, row should exist
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
arm = new RowMutation(ROW);
+ p = new Put(ROW);
+ p.add(FAMILY, QUALIFIERS[1], VALUE);
arm.add(p);
- arm.add(new Delete(ROW));
+ Delete d = new Delete(ROW);
+ d.deleteColumns(FAMILY, QUALIFIERS[0]);
+ arm.add(d);
t.batch(Arrays.asList((Row)arm));
r = t.get(g);
- assertTrue(r.isEmpty());
+ assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
+ assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
}
@Test
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1232551&r1=1232550&r2=1232551&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java Tue Jan 17 19:38:13 2012
@@ -333,6 +333,8 @@ public class TestAtomicOperation extends
}
} catch (IOException e) {
e.printStackTrace();
+ failures.incrementAndGet();
+ fail();
}
}
}