You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/04/08 01:41:58 UTC
svn commit: r931723 - in /hadoop/hbase/branches/0.20: ./
src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/
src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/
src/contrib/transactional/src...
Author: jdcryans
Date: Wed Apr 7 23:41:58 2010
New Revision: 931723
URL: http://svn.apache.org/viewvc?rev=931723&view=rev
Log:
HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
writes occur in same millisecond (Clint Morgan via J-D)
Modified:
hadoop/hbase/branches/0.20/CHANGES.txt
hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
Modified: hadoop/hbase/branches/0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/CHANGES.txt?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.20/CHANGES.txt Wed Apr 7 23:41:58 2010
@@ -114,6 +114,8 @@ Release 0.20.4 - Unreleased
HBASE-2252 Mapping a very big table kills region servers
HBASE-2411 Findbugs target
HBASE-2412 [stargate] PerformanceEvaluation
+ HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
+ writes occur in same millisecond (Clint Morgan via J-D)
NEW FEATURES
HBASE-2257 [stargate] multiuser mode
Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Wed Apr 7 23:41:58 2010
@@ -175,6 +175,9 @@ public class IndexedTable extends Transa
Result row = indexResult[i];
byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN);
+ if (baseRow == null) {
+ throw new IllegalStateException("Missing base row for indexed row: ["+Bytes.toString(row.getRow())+"]");
+ }
LOG.debug("next index row [" + Bytes.toString(row.getRow())
+ "] -> base row [" + Bytes.toString(baseRow) + "]");
Result baseResult = null;
@@ -195,7 +198,10 @@ public class IndexedTable extends Transa
}
if (baseResult != null) {
- results.addAll(baseResult.list());
+ List<KeyValue> list = baseResult.list();
+ if (list != null) {
+ results.addAll(list);
+ }
}
result[i] = new Result(results);
Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Wed Apr 7 23:41:58 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.client.Delete;
@@ -141,10 +142,32 @@ class IndexedRegion extends Transactiona
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult);
for (IndexSpecification indexSpec : indexesToUpdate) {
- removeOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
- updateIndex(indexSpec, put.getRow(), newColumnValues);
+ updateIndex(indexSpec, put, newColumnValues, oldColumnValues);
}
}
+
+ // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
+ // a likely deadlock if the number of RPCs we are trying to serve is >= the
+ // number of handler threads.
+ private void updateIndex(IndexSpecification indexSpec, Put put,
+ NavigableMap<byte[], byte[]> newColumnValues,
+ SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
+ Delete indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
+ Put indexPut = makeIndexUpdate(indexSpec, put.getRow(), newColumnValues);
+
+ HTable indexTable = getIndexTable(indexSpec);
+ if (indexDelete != null && !Bytes.equals(indexDelete.getRow(), indexPut.getRow())) {
+ // Only do the delete if the row changed. This way we save the put after delete issues in HBASE-2256
+ LOG.debug("Deleting old index row ["+Bytes.toString(indexDelete.getRow())+"]. New row is ["+Bytes.toString(indexPut.getRow())+"].");
+ indexTable.delete(indexDelete);
+ } else if (indexDelete != null){
+ LOG.debug("Skipping deleting index row ["+Bytes.toString(indexDelete.getRow())+"] because it has not changed.");
+ }
+ indexTable.put(indexPut);
+ }
+
+
+
/** Return the columns needed for the update. */
private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
@@ -157,7 +180,7 @@ class IndexedRegion extends Transactiona
return neededColumns;
}
- private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
+ private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row,
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
if (!oldColumnValues.containsKey(indexedCol)) {
@@ -165,7 +188,7 @@ class IndexedRegion extends Transactiona
+ "] not trying to remove old entry for row ["
+ Bytes.toString(row) + "] because col ["
+ Bytes.toString(indexedCol) + "] is missing");
- return;
+ return null;
}
}
@@ -173,7 +196,7 @@ class IndexedRegion extends Transactiona
oldColumnValues);
LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
+ Bytes.toString(oldIndexRow) + "]");
- getIndexTable(indexSpec).delete(new Delete(oldIndexRow));
+ return new Delete(oldIndexRow);
}
private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) {
@@ -204,23 +227,23 @@ class IndexedRegion extends Transactiona
return false;
}
- // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
- // a likely deadlock if the number of RPCs we are trying to serve is >= the
- // number of handler threads.
- private void updateIndex(IndexSpecification indexSpec, byte[] row,
+ private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row,
SortedMap<byte[], byte[]> columnValues) throws IOException {
Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
- getIndexTable(indexSpec).put(indexUpdate);
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
+ Bytes.toString(row) + "]");
+ return indexUpdate;
+
}
+ // FIXME we can be smarter about this and avoid the base gets and index maintenance in many cases.
@Override
public void delete(Delete delete, final Integer lockid, boolean writeToWAL)
throws IOException {
- // First remove the existing indexes.
+ // First look at the current (to be the old) state.
+ SortedMap<byte[], byte[]> oldColumnValues = null;
if (!getIndexes().isEmpty()) {
// Need all columns
NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
@@ -231,12 +254,7 @@ class IndexedRegion extends Transactiona
}
Result oldRow = super.get(get, lockid);
- SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldRow);
-
-
- for (IndexSpecification indexSpec : getIndexes()) {
- removeOldIndexEntry(indexSpec, delete.getRow(), oldColumnValues);
- }
+ oldColumnValues = convertToValueMap(oldRow);
}
super.delete(delete, lockid, writeToWAL);
@@ -246,17 +264,64 @@ class IndexedRegion extends Transactiona
// Rebuild index if there is still a version visible.
Result currentRow = super.get(get, lockid);
- if (!currentRow.isEmpty()) {
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
- for (IndexSpecification indexSpec : getIndexes()) {
- if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
- updateIndex(indexSpec, delete.getRow(), currentColumnValues);
+
+ for (IndexSpecification indexSpec : getIndexes()) {
+ Delete indexDelete = null;
+ if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, oldColumnValues)) {
+ indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, delete
+ .getRow(), oldColumnValues);
+ }
+ Put indexPut = null;
+ if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec,
+ currentColumnValues)) {
+ indexPut = makeIndexUpdate(indexSpec, delete.getRow(),
+ currentColumnValues);
+ }
+ if (indexPut == null && indexDelete == null) {
+ continue;
+ }
+
+ HTable indexTable = getIndexTable(indexSpec);
+ if (indexDelete != null
+ && (indexPut == null || !Bytes.equals(indexDelete.getRow(),
+ indexPut.getRow()))) {
+ // Only do the delete if the row changed. This way we save the put
+ // after delete issues in HBASE-2256
+ LOG.debug("Deleting old index row ["
+ + Bytes.toString(indexDelete.getRow()) + "].");
+ indexTable.delete(indexDelete);
+ } else if (indexDelete != null) {
+ LOG.debug("Skipping deleting index row ["
+ + Bytes.toString(indexDelete.getRow())
+ + "] because it has not changed.");
+
+ for (byte [] indexCol : indexSpec.getAdditionalColumns()) {
+ byte[][] parsed = HStoreKey.parseColumn(indexCol);
+ List<KeyValue> famDeletes = delete.getFamilyMap().get(parsed[0]);
+ if (famDeletes != null) {
+ for (KeyValue kv : famDeletes) {
+ if (Bytes.equals(indexCol, kv.getColumn())) {
+ LOG.debug("Need to delete this specific column: "+Bytes.toString(kv.getColumn()));
+ Delete columnDelete = new Delete(indexDelete.getRow());
+ columnDelete.deleteColumns(indexCol);
+ indexTable.delete(columnDelete);
+ }
+ }
+
+ }
}
}
+
+ if (indexPut != null) {
+ getIndexTable(indexSpec).put(indexPut);
+ }
}
+
}
-
}
+
+
private SortedMap<byte[], byte[]> convertToValueMap(Result result) {
SortedMap<byte[], byte[]> currentColumnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Wed Apr 7 23:41:58 2010
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -138,17 +140,25 @@ class TransactionState {
}
void addWrite(final Put write) {
+ updateLatestTimestamp(write.getFamilyMap().values());
+ puts.add(write);
+ }
+
+
+ // FIXME REVIEW not sure about this. Needed for log recovery? but broke other tests.
+ private void updateLatestTimestamp(Collection<List<KeyValue>> kvsCollection) {
byte [] now = Bytes.toBytes(System.currentTimeMillis());
- // HAVE to manually set the KV timestamps
- for (List<KeyValue> kvs : write.getFamilyMap().values()) {
- for (KeyValue kv : kvs) {
+ // HAVE to manually set the KV timestamps
+ for (List<KeyValue> kvs : kvsCollection) {
+ for (KeyValue kv : kvs) {
+ if (kv.isLatestTimestamp()) {
kv.updateLatestStamp(now);
}
- }
-
- puts.add(write);
+ }
+ }
}
+
boolean hasWrite() {
return puts.size() > 0 || deletes.size() > 0;
}
@@ -158,6 +168,7 @@ class TransactionState {
}
void addDelete(final Delete delete) {
+ //updateLatestTimestamp(delete.getFamilyMap().values());
deletes.add(delete);
}
@@ -371,13 +382,6 @@ class TransactionState {
return deletes;
}
- /** Set deleteSet.
- * @param deleteSet the deleteSet to set
- */
- void setDeleteSet(List<Delete> deleteSet) {
- this.deletes = deleteSet;
- }
-
/** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
*
* @return scanner
@@ -393,20 +397,56 @@ class TransactionState {
*/
private class PutScanner implements KeyValueScanner, InternalScanner {
- private NavigableSet<KeyValue> kvSet;
+ private List<KeyValue> kvList;
private Iterator<KeyValue> iterator;
private boolean didHasNext = false;
private KeyValue next = null;
PutScanner() {
- kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ kvList = new ArrayList<KeyValue>();
+
for (Put put : puts) {
for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
- kvSet.addAll(putKVs);
+ kvList.addAll(putKVs);
+ }
+ }
+
+ Collections.sort(kvList, new Comparator<KeyValue>() {
+
+ /** We want to honor the order of the puts in the case where multiple have the same timestamp.
+ *
+ * @param o1
+ * @param o2
+ * @return
+ */
+ public int compare(KeyValue o1, KeyValue o2) {
+ int result = KeyValue.COMPARATOR.compare(o1, o2);
+ if (result != 0) {
+ return result;
+ }
+ if (o1 == o2) {
+ return 0;
+ }
+ int put1Number = getPutNumber(o1);
+ int put2Number = getPutNumber(o2);
+ return put2Number - put1Number;
+ }
+ });
+
+ iterator = kvList.iterator();
+ }
+
+ private int getPutNumber(KeyValue kv) {
+ for (int i=0; i < puts.size(); i++) {
+ for (List<KeyValue> putKVs : puts.get(i).getFamilyMap().values()) {
+ for (KeyValue putKV : putKVs)
+ if (putKV == kv) {
+ return i;
+ }
}
}
- iterator = kvSet.iterator();
+ throw new IllegalStateException("Can not fine put KV in puts");
}
public void close() {
@@ -424,8 +464,18 @@ class TransactionState {
return next;
}
+ private void iteratorFrom(KeyValue key) {
+ iterator = kvList.iterator();
+ while (iterator.hasNext()) {
+ KeyValue next = iterator.next();
+ if (KeyValue.COMPARATOR.compare(next, key) >= 0) {
+ break;
+ }
+ }
+ }
+
public boolean seek(KeyValue key) {
- iterator = kvSet.headSet(key).iterator();
+ iteratorFrom(key);
getNext();
return next != null;
Modified: hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=931723&r1=931722&r2=931723&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (original)
+++ hadoop/hbase/branches/0.20/src/contrib/transactional/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Wed Apr 7 23:41:58 2010
@@ -128,6 +128,25 @@ public class TestTransactions extends HB
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
}
+ public void testGetAfterPutPut() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ int originalValue = Bytes.toInt(table.get(transactionState,
+ new Get(ROW1).addColumn(COL_A)).value());
+ int newValue = originalValue + 1;
+
+ table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+ .toBytes(newValue)));
+
+ newValue = newValue + 1;
+
+ table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+ .toBytes(newValue)));
+
+ Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(COL_A));
+ Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
+ }
+
public void testScanAfterUpdatePut() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
@@ -174,14 +193,10 @@ public class TestTransactions extends HB
public void testPutPutScan() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
+
int row2Value = 199;
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
.toBytes(row2Value)));
- try {
- Thread.sleep(500);
- } catch (InterruptedException ex) {
- // just ignore
- }
row2Value = 299;
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
@@ -198,8 +213,18 @@ public class TestTransactions extends HB
Assert.assertNotNull(result);
Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
+
+ // TODO commit and verifty that we see second put.
+ }
+
+ public void testPutPutScanOverAndOver() throws IOException {
+ // Do this test many times to try and hit two puts in the same millisecond
+ for (int i=0 ; i < 100; i++) {
+ testPutPutScan();
+ }
}
+
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
private TransactionState makeTransaction1() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();