You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/04/08 01:49:36 UTC
svn commit: r931727 - in /hadoop/hbase/trunk: ./
contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/
contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/
contrib/transactional/src/main/java...
Author: jdcryans
Date: Wed Apr 7 23:49:35 2010
New Revision: 931727
URL: http://svn.apache.org/viewvc?rev=931727&view=rev
Log:
HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
writes occur in same millisecond (Clint Morgan via J-D)
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
hadoop/hbase/trunk/contrib/transactional/src/test/java/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=931727&r1=931726&r2=931727&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Apr 7 23:49:35 2010
@@ -486,6 +486,8 @@ Release 0.21.0 - Unreleased
HBASE-2252 Mapping a very big table kills region servers
HBASE-2412 [stargate] PerformanceEvaluation
HBASE-2419 Remove from RS logs the fat NotServingRegionException stack
+ HBASE-2286 [Transactional Contrib] Correctly handle or avoid cases where
+ writes occur in same millisecond (Clint Morgan via J-D)
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified: hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=931727&r1=931726&r2=931727&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (original)
+++ hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Wed Apr 7 23:49:35 2010
@@ -181,6 +181,9 @@ public class IndexedTable extends Transa
Result row = indexResult[i];
byte[] baseRow = row.getValue(INDEX_COL_FAMILY_NAME, INDEX_BASE_ROW);
+ if (baseRow == null) {
+ throw new IllegalStateException("Missing base row for indexed row: ["+Bytes.toString(row.getRow())+"]");
+ }
LOG.debug("next index row [" + Bytes.toString(row.getRow())
+ "] -> base row [" + Bytes.toString(baseRow) + "]");
Result baseResult = null;
@@ -209,7 +212,10 @@ public class IndexedTable extends Transa
}
if (baseResult != null) {
- results.addAll(baseResult.list());
+ List<KeyValue> list = baseResult.list();
+ if (list != null) {
+ results.addAll(list);
+ }
}
result[i] = new Result(results);
Modified: hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=931727&r1=931726&r2=931727&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (original)
+++ hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Wed Apr 7 23:49:35 2010
@@ -147,11 +147,30 @@ class IndexedRegion extends Transactiona
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldResult);
for (IndexSpecification indexSpec : indexesToUpdate) {
- removeOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
- updateIndex(indexSpec, put.getRow(), newColumnValues);
+ updateIndex(indexSpec, put, newColumnValues, oldColumnValues);
}
}
+ // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
+ // a likely deadlock if the number of RPCs we are trying to serve is >= the
+ // number of handler threads.
+ private void updateIndex(IndexSpecification indexSpec, Put put,
+ NavigableMap<byte[], byte[]> newColumnValues,
+ SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
+ Delete indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, put.getRow(), oldColumnValues);
+ Put indexPut = makeIndexUpdate(indexSpec, put.getRow(), newColumnValues);
+
+ HTable indexTable = getIndexTable(indexSpec);
+ if (indexDelete != null && !Bytes.equals(indexDelete.getRow(), indexPut.getRow())) {
+ // Only do the delete if the row changed. This way we save the put after delete issues in HBASE-2256
+ LOG.debug("Deleting old index row ["+Bytes.toString(indexDelete.getRow())+"]. New row is ["+Bytes.toString(indexPut.getRow())+"].");
+ indexTable.delete(indexDelete);
+ } else if (indexDelete != null){
+ LOG.debug("Skipping deleting index row ["+Bytes.toString(indexDelete.getRow())+"] because it has not changed.");
+ }
+ indexTable.put(indexPut);
+ }
+
/** Return the columns needed for the update. */
private NavigableSet<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
NavigableSet<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
@@ -163,7 +182,7 @@ class IndexedRegion extends Transactiona
return neededColumns;
}
- private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
+ private Delete makeDeleteToRemoveOldIndexEntry(IndexSpecification indexSpec, byte[] row,
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
if (!oldColumnValues.containsKey(indexedCol)) {
@@ -171,7 +190,7 @@ class IndexedRegion extends Transactiona
+ "] not trying to remove old entry for row ["
+ Bytes.toString(row) + "] because col ["
+ Bytes.toString(indexedCol) + "] is missing");
- return;
+ return null;
}
}
@@ -179,7 +198,7 @@ class IndexedRegion extends Transactiona
oldColumnValues);
LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
+ Bytes.toString(oldIndexRow) + "]");
- getIndexTable(indexSpec).delete(new Delete(oldIndexRow));
+ return new Delete(oldIndexRow);
}
private NavigableMap<byte[], byte[]> getColumnsFromPut(Put put) {
@@ -212,23 +231,21 @@ class IndexedRegion extends Transactiona
return false;
}
- // FIXME: This call takes place in an RPC, and requires an RPC. This makes for
- // a likely deadlock if the number of RPCs we are trying to serve is >= the
- // number of handler threads.
- private void updateIndex(IndexSpecification indexSpec, byte[] row,
+ private Put makeIndexUpdate(IndexSpecification indexSpec, byte[] row,
SortedMap<byte[], byte[]> columnValues) throws IOException {
Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
- getIndexTable(indexSpec).put(indexUpdate);
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
+ Bytes.toString(row) + "]");
-
+ return indexUpdate;
}
+ // FIXME we can be smarter about this and avoid the base gets and index maintenance in many cases.
@Override
public void delete(Delete delete, final Integer lockid, boolean writeToWAL)
throws IOException {
- // First remove the existing indexes.
+ // First look at the current (to be the old) state.
+ SortedMap<byte[], byte[]> oldColumnValues = null;
if (!getIndexes().isEmpty()) {
// Need all columns
NavigableSet<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
@@ -244,12 +261,7 @@ class IndexedRegion extends Transactiona
}
Result oldRow = super.get(get, lockid);
- SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldRow);
-
-
- for (IndexSpecification indexSpec : getIndexes()) {
- removeOldIndexEntry(indexSpec, delete.getRow(), oldColumnValues);
- }
+ oldColumnValues = convertToValueMap(oldRow);
}
super.delete(delete, lockid, writeToWAL);
@@ -259,13 +271,57 @@ class IndexedRegion extends Transactiona
// Rebuild index if there is still a version visible.
Result currentRow = super.get(get, lockid);
- if (!currentRow.isEmpty()) {
- SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
- for (IndexSpecification indexSpec : getIndexes()) {
- if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
- updateIndex(indexSpec, delete.getRow(), currentColumnValues);
+ SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentRow);
+ for (IndexSpecification indexSpec : getIndexes()) {
+ Delete indexDelete = null;
+ if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, oldColumnValues)) {
+ indexDelete = makeDeleteToRemoveOldIndexEntry(indexSpec, delete
+ .getRow(), oldColumnValues);
+ }
+ Put indexPut = null;
+ if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec,
+ currentColumnValues)) {
+ indexPut = makeIndexUpdate(indexSpec, delete.getRow(),
+ currentColumnValues);
+ }
+ if (indexPut == null && indexDelete == null) {
+ continue;
+ }
+
+ HTable indexTable = getIndexTable(indexSpec);
+ if (indexDelete != null
+ && (indexPut == null || !Bytes.equals(indexDelete.getRow(),
+ indexPut.getRow()))) {
+ // Only do the delete if the row changed. This way we save the put
+ // after delete issues in HBASE-2256
+ LOG.debug("Deleting old index row ["
+ + Bytes.toString(indexDelete.getRow()) + "].");
+ indexTable.delete(indexDelete);
+ } else if (indexDelete != null) {
+ LOG.debug("Skipping deleting index row ["
+ + Bytes.toString(indexDelete.getRow())
+ + "] because it has not changed.");
+
+ for (byte [] indexCol : indexSpec.getAdditionalColumns()) {
+ byte[][] parsed = KeyValue.parseColumn(indexCol);
+ List<KeyValue> famDeletes = delete.getFamilyMap().get(parsed[0]);
+ if (famDeletes != null) {
+ for (KeyValue kv : famDeletes) {
+ if (Bytes.equals(parsed[0], kv.getFamily()) && Bytes.equals(parsed[1], kv.getQualifier())) {
+ LOG.debug("Need to delete this specific column: "+Bytes.toString(indexCol));
+ Delete columnDelete = new Delete(indexDelete.getRow());
+ columnDelete.deleteColumns(parsed[0],parsed[1]);
+ indexTable.delete(columnDelete);
+ }
+ }
+
+ }
}
}
+
+ if (indexPut != null) {
+ getIndexTable(indexSpec).put(indexPut);
+ }
}
}
Modified: hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=931727&r1=931726&r2=931727&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/trunk/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Wed Apr 7 23:49:35 2010
@@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -138,15 +140,21 @@ class TransactionState {
}
void addWrite(final Put write) {
+ updateLatestTimestamp(write.getFamilyMap().values());
+ puts.add(write);
+ }
+
+ //FIXME REVIEW not sure about this. Needed for log recovery? but broke other tests.
+ private void updateLatestTimestamp(Collection<List<KeyValue>> kvsCollection) {
byte [] now = Bytes.toBytes(System.currentTimeMillis());
- // HAVE to manually set the KV timestamps
- for (List<KeyValue> kvs : write.getFamilyMap().values()) {
- for (KeyValue kv : kvs) {
- kv.updateLatestStamp(now);
- }
+ // HAVE to manually set the KV timestamps
+ for (List<KeyValue> kvs : kvsCollection) {
+ for (KeyValue kv : kvs) {
+ if (kv.isLatestTimestamp()) {
+ kv.updateLatestStamp(now);
+ }
}
-
- puts.add(write);
+ }
}
boolean hasWrite() {
@@ -371,13 +379,6 @@ class TransactionState {
return deletes;
}
- /** Set deleteSet.
- * @param deleteSet the deleteSet to set
- */
- void setDeleteSet(List<Delete> deleteSet) {
- this.deletes = deleteSet;
- }
-
/** Get a scanner to go through the puts from this transaction. Used to weave together the local trx puts with the global state.
*
* @return scanner
@@ -393,20 +394,54 @@ class TransactionState {
*/
private class PutScanner implements KeyValueScanner, InternalScanner {
- private NavigableSet<KeyValue> kvSet;
+ private List<KeyValue> kvList;
private Iterator<KeyValue> iterator;
private boolean didHasNext = false;
private KeyValue next = null;
PutScanner() {
- kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ kvList = new ArrayList<KeyValue>();
for (Put put : puts) {
for (List<KeyValue> putKVs : put.getFamilyMap().values()) {
- kvSet.addAll(putKVs);
+ kvList.addAll(putKVs);
+ }
+ }
+ Collections.sort(kvList, new Comparator<KeyValue>() {
+
+ /** We want to honor the order of the puts in the case where multiple have the same timestamp.
+ *
+ * @param o1
+ * @param o2
+ * @return
+ */
+ public int compare(KeyValue o1, KeyValue o2) {
+ int result = KeyValue.COMPARATOR.compare(o1, o2);
+ if (result != 0) {
+ return result;
+ }
+ if (o1 == o2) {
+ return 0;
+ }
+ int put1Number = getPutNumber(o1);
+ int put2Number = getPutNumber(o2);
+ return put2Number - put1Number;
+ }
+ });
+
+ iterator = kvList.iterator();
+ }
+
+ private int getPutNumber(KeyValue kv) {
+ for (int i=0; i < puts.size(); i++) {
+ for (List<KeyValue> putKVs : puts.get(i).getFamilyMap().values()) {
+ for (KeyValue putKV : putKVs)
+ if (putKV == kv) {
+ return i;
+ }
}
}
- iterator = kvSet.iterator();
+ throw new IllegalStateException("Can not fine put KV in puts");
}
public void close() {
@@ -424,8 +459,18 @@ class TransactionState {
return next;
}
+ private void iteratorFrom(KeyValue key) {
+ iterator = kvList.iterator();
+ while (iterator.hasNext()) {
+ KeyValue next = iterator.next();
+ if (KeyValue.COMPARATOR.compare(next, key) >= 0) {
+ break;
+ }
+ }
+ }
+
public boolean seek(KeyValue key) {
- iterator = kvSet.headSet(key).iterator();
+ iteratorFrom(key);
getNext();
return next != null;
Modified: hadoop/hbase/trunk/contrib/transactional/src/test/java/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/contrib/transactional/src/test/java/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=931727&r1=931726&r2=931727&view=diff
==============================================================================
--- hadoop/hbase/trunk/contrib/transactional/src/test/java/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (original)
+++ hadoop/hbase/trunk/contrib/transactional/src/test/java/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Wed Apr 7 23:49:35 2010
@@ -127,6 +127,25 @@ public class TestTransactions extends HB
Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
}
+ public void testGetAfterPutPut() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ int originalValue = Bytes.toInt(table.get(transactionState,
+ new Get(ROW1).addColumn(FAMILY, QUAL_A)).value());
+ int newValue = originalValue + 1;
+
+ table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+ .toBytes(newValue)));
+
+ newValue = newValue + 1;
+
+ table.put(transactionState, new Put(ROW1).add(FAMILY, QUAL_A, Bytes
+ .toBytes(newValue)));
+
+ Result row1_A = table.get(transactionState, new Get(ROW1).addColumn(FAMILY, QUAL_A));
+ Assert.assertEquals(newValue, Bytes.toInt(row1_A.value()));
+ }
+
public void testScanAfterUpdatePut() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
@@ -176,11 +195,6 @@ public class TestTransactions extends HB
int row2Value = 199;
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
.toBytes(row2Value)));
- try {
- Thread.sleep(500);
- } catch (InterruptedException ex) {
- // just ignore
- }
row2Value = 299;
table.put(transactionState, new Put(ROW2).add(FAMILY, QUAL_A, Bytes
@@ -197,8 +211,17 @@ public class TestTransactions extends HB
Assert.assertNotNull(result);
Assert.assertEquals(Bytes.toString(ROW2), Bytes.toString(result.getRow()));
Assert.assertEquals(row2Value, Bytes.toInt(result.value()));
+
+ // TODO commit and verifty that we see second put.
}
+ public void testPutPutScanOverAndOver() throws IOException {
+ // Do this test many times to try and hit two puts in the same millisecond
+ for (int i=0 ; i < 100; i++) {
+ testPutPutScan();
+ }
+ }
+
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
private TransactionState makeTransaction1() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();