You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/01/15 02:21:56 UTC
[2/3] hbase git commit: --whitespace=fix
--whitespace=fix
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/930f68c0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/930f68c0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/930f68c0
Branch: refs/heads/branch-1.2
Commit: 930f68c0b976a600066b838283a0f3dce050256f
Parents: fd72340
Author: stack <st...@apache.org>
Authored: Thu Jan 14 12:00:53 2016 -0800
Committer: stack <st...@apache.org>
Committed: Thu Jan 14 12:00:53 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Scan.java | 2 +
.../hadoop/hbase/client/TestIncrement.java | 2 +-
.../main/java/org/apache/hadoop/hbase/Tag.java | 29 +-
.../hadoop/hbase/regionserver/HRegion.java | 556 ++++++++++++-------
.../MultiVersionConcurrencyControl.java | 5 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 14 +-
.../hadoop/hbase/IncrementPerformanceTest.java | 129 +++++
.../hadoop/hbase/client/TestFromClientSide.java | 263 +--------
.../hbase/client/TestFromClientSide3.java | 5 +-
.../hbase/client/TestFromClientSideNoCodec.java | 2 +-
.../TestFromClientSideWithCoprocessor.java | 2 +-
...tIncrementFromClientSideWithCoprocessor.java | 49 ++
.../client/TestIncrementsFromClientSide.java | 433 +++++++++++++++
.../mapreduce/TestTableInputFormatScanBase.java | 5 +-
.../hbase/regionserver/TestAtomicOperation.java | 31 +-
.../hbase/regionserver/TestRegionIncrement.java | 254 +++++++++
.../hadoop/hbase/regionserver/TestTags.java | 2 +-
17 files changed, 1290 insertions(+), 493 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 4825cca..b13837d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -220,6 +220,7 @@ public class Scan extends Query {
filter = scan.getFilter(); // clone?
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
consistency = scan.getConsistency();
+ this.setIsolationLevel(scan.getIsolationLevel());
reversed = scan.isReversed();
small = scan.isSmall();
allowPartialResults = scan.getAllowPartialResults();
@@ -262,6 +263,7 @@ public class Scan extends Query {
this.familyMap = get.getFamilyMap();
this.getScan = true;
this.consistency = get.getConsistency();
+ this.setIsolationLevel(get.getIsolationLevel());
for (Map.Entry<String, byte[]> attr : get.getAttributesMap().entrySet()) {
setAttribute(attr.getKey(), attr.getValue());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java
index 8a2c447..39cde45 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java
@@ -29,7 +29,7 @@ import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestIncrement {
@Test
- public void test() {
+ public void testIncrementInstance() {
final long expected = 13;
Increment inc = new Increment(new byte [] {'r'});
int total = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
index 2e7314d..d0719f0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -180,6 +181,7 @@ public class Tag {
* @return the serialized tag data as bytes
*/
public static byte[] fromList(List<Tag> tags) {
+ if (tags == null || tags.size() <= 0) return null;
int length = 0;
for (Tag tag: tags) {
length += tag.length;
@@ -226,4 +228,29 @@ public class Tag {
int getOffset() {
return this.offset;
}
-}
+
+
+ /**
+ * @return A List<Tag> of any Tags found in <code>cell</code> else null.
+ */
+ public static List<Tag> carryForwardTags(final Cell cell) {
+ return carryForwardTags(null, cell);
+ }
+
+ /**
+ * @return Add to <code>tagsOrNull</code> any Tags <code>cell</code> is carrying or null if
+ * it is carrying no Tags AND the passed in <code>tagsOrNull</code> is null (else we return new
+ * List<Tag> with Tags found).
+ */
+ public static List<Tag> carryForwardTags(final List<Tag> tagsOrNull, final Cell cell) {
+ List<Tag> tags = tagsOrNull;
+ if (cell.getTagsLength() <= 0) return tags;
+ Iterator<Tag> itr =
+ CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+ if (tags == null) tags = new ArrayList<Tag>();
+ while (itr.hasNext()) {
+ tags.add(itr.next());
+ }
+ return tags;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index cfd057a..f1566af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -150,8 +151,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
@@ -215,6 +216,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
/**
+ * Set region to take the fast increment path. Constraint is that caller can only access the
+ * Cell via Increment; intermixing Increment with other Mutations will give indeterminate
+ * results. A Get with {@link IsolationLevel#READ_UNCOMMITTED} will get the latest increment
+ * or an Increment of zero will do the same.
+ */
+ public static final String INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY =
+ "hbase.increment.fast.but.narrow.consistency";
+ private final boolean incrementFastButNarrowConsistency;
+
+ /**
* This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value.
*/
@@ -745,6 +756,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+
+ // See #INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY for what this flag is about.
+ this.incrementFastButNarrowConsistency =
+ this.conf.getBoolean(INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, false);
}
void setHTableSpecificConf() {
@@ -3592,30 +3607,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int listSize = cells.size();
for (int i = 0; i < listSize; i++) {
Cell cell = cells.get(i);
- List<Tag> newTags = new ArrayList<Tag>();
- Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
- cell.getTagsOffset(), cell.getTagsLength());
-
- // Carry forward existing tags
-
- while (tagIterator.hasNext()) {
-
- // Add any filters or tag specific rewrites here
-
- newTags.add(tagIterator.next());
- }
-
- // Cell TTL handling
-
- // Check again if we need to add a cell TTL because early out logic
- // above may change when there are more tag based features in core.
- if (m.getTTL() != Long.MAX_VALUE) {
- // Add a cell TTL tag
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
- }
+ List<Tag> newTags = Tag.carryForwardTags(null, cell);
+ newTags = carryForwardTTLTag(newTags, m);
// Rewrite the cell with the updated set of tags
-
cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
@@ -7170,24 +7165,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Process cell tags
// Make a union of the set of tags in the old and new KVs
- List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());
- newTags = carryForwardTags(cell, newTags);
-
- // Cell TTL handling
-
- if (mutate.getTTL() != Long.MAX_VALUE) {
- // Add the new TTL tag
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
- }
+ List<Tag> tags = Tag.carryForwardTags(null, oldCell);
+ tags = Tag.carryForwardTags(tags, cell);
+ tags = carryForwardTTLTag(tags, mutate);
// Rebuild tags
- byte[] tagBytes = Tag.fromList(newTags);
+ byte[] tagBytes = Tag.fromList(tags);
// allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), ts, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
- tagBytes.length);
+ tagBytes == null? 0: tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
@@ -7206,8 +7195,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
newCell.getValueOffset() + oldCell.getValueLength(),
cell.getValueLength());
// Copy in tag data
- System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
- tagBytes.length);
+ if (tagBytes != null) {
+ System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+ tagBytes.length);
+ }
idx++;
} else {
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
@@ -7216,8 +7207,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Cell TTL handling
if (mutate.getTTL() != Long.MAX_VALUE) {
- List<Tag> newTags = new ArrayList<Tag>(1);
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
// Add the new TTL tag
newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(),
@@ -7227,7 +7216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
cell.getQualifierLength(),
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- newTags);
+ carryForwardTTLTag(mutate));
} else {
newCell = cell;
}
@@ -7362,174 +7351,218 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public Result increment(Increment mutation, long nonceGroup, long nonce)
throws IOException {
Operation op = Operation.INCREMENT;
- byte [] row = mutation.getRow();
- checkRow(row, op.toString());
- boolean flush = false;
- Durability durability = getEffectiveDurability(mutation.getDurability());
- boolean writeToWAL = durability != Durability.SKIP_WAL;
- WALEdit walEdits = null;
- List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
-
- Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
- long size = 0;
- long txid = 0;
checkReadOnly();
checkResources();
- // Lock row
+ checkRow(mutation.getRow(), op.toString());
startRegionOperation(op);
this.writeRequestsCount.increment();
- RowLock rowLock = null;
+ try {
+ // Which Increment is it? Narrow increment-only consistency or slow (default) and general
+ // row-wide consistency.
+
+ // So, difference between fastAndNarrowConsistencyIncrement and slowButConsistentIncrement is
+ // that the former holds the row lock until the sync completes; this allows us to reason that
+ // there are no other writers afoot when we read the current increment value. The row lock
+ // means that we do not need to wait on mvcc reads to catch up to writes before we proceed
+ // with the read, the root of the slowdown seen in HBASE-14460. The fast-path also does not
+ // wait on mvcc to complete before returning to the client. We also reorder the write so that
+ // the update of memstore happens AFTER sync returns; i.e. the write pipeline does less
+ // zigzagging now.
+ //
+ // See the comment on INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY
+ // for the constraints that apply when you take this code path; it is correct but only if
+ // Increments are used mutating an Increment Cell; mixing concurrent Put+Delete and Increment
+ // will yield indeterminate results.
+ return this.incrementFastButNarrowConsistency?
+ fastAndNarrowConsistencyIncrement(mutation, nonceGroup, nonce):
+ slowButConsistentIncrement(mutation, nonceGroup, nonce);
+ } finally {
+ if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
+ closeRegionOperation(Operation.INCREMENT);
+ }
+ }
+
+ /**
+ * The bulk of this method is a bulk-and-paste of the slowButConsistentIncrement but with some
+ * reordering to enable the fast increment (reordering allows us to also drop some state
+ * carrying Lists and variables so the flow here is more straight-forward). We copy-and-paste
+ * because cannot break down the method further into smaller pieces. Too much state. Will redo
+ * in trunk and tip of branch-1 to undo duplication here and in append, checkAnd*, etc. For why
+ * this route is 'faster' than the alternative slowButConsistentIncrement path, see the comment
+ * in calling method.
+ * @return Resulting increment
+ * @throws IOException
+ */
+ private Result fastAndNarrowConsistencyIncrement(Increment increment, long nonceGroup,
+ long nonce)
+ throws IOException {
+ long accumulatedResultSize = 0;
WALKey walKey = null;
- MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
- boolean doRollBackMemstore = false;
- TimeRange tr = mutation.getTimeRange();
+ long txid = 0;
+ // This is all kvs accumulated during this increment processing. Includes increments where the
+ // increment is zero: i.e. client just wants to get current state of the increment w/o
+ // changing it. These latter increments by zero are NOT added to the WAL.
+ List<Cell> allKVs = new ArrayList<Cell>(increment.size());
+ Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
+ RowLock rowLock = getRowLock(increment.getRow());
try {
- rowLock = getRowLock(row);
- assert rowLock != null;
+ lock(this.updatesLock.readLock());
try {
- lock(this.updatesLock.readLock());
- try {
- // wait for all prior MVCC transactions to finish - while we hold the row lock
- // (so that we are guaranteed to see the latest state)
- mvcc.await();
- if (this.coprocessorHost != null) {
- Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);
- if (r != null) {
- return r;
+ if (this.coprocessorHost != null) {
+ Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
+ if (r != null) return r;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
+ WALEdit walEdits = null;
+ // Process increments a Store/family at a time.
+ // Accumulate edits for memstore to add later after we've added to WAL.
+ Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
+ for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
+ byte [] columnFamilyName = entry.getKey();
+ List<Cell> increments = entry.getValue();
+ Store store = this.stores.get(columnFamilyName);
+ // Do increment for this store; be sure to 'sort' the increments first so increments
+ // match order in which we get back current Cells when we get.
+ List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
+ sort(increments, store.getComparator()), now,
+ MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs,
+ IsolationLevel.READ_UNCOMMITTED);
+ if (!results.isEmpty()) {
+ forMemStore.put(store, results);
+ // Prepare WAL updates
+ if (writeToWAL) {
+ if (walEdits == null) walEdits = new WALEdit();
+ walEdits.getCells().addAll(results);
}
}
- long now = EnvironmentEdgeManager.currentTime();
- // Process each family
- for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
- Store store = stores.get(family.getKey());
- List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
-
- List<Cell> results = doGet(store, row, family, tr);
-
- // Iterate the input columns and update existing values if they were
- // found, otherwise add new column initialized to the increment amount
-
- // Avoid as much copying as possible. We may need to rewrite and
- // consolidate tags. Bytes are only copied once.
- // Would be nice if KeyValue had scatter/gather logic
- int idx = 0;
- // HERE WE DIVERGE FROM APPEND
- List<Cell> edits = family.getValue();
- for (int i = 0; i < edits.size(); i++) {
- Cell cell = edits.get(i);
- long amount = Bytes.toLong(CellUtil.cloneValue(cell));
- boolean noWriteBack = (amount == 0);
-
- List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());
-
- Cell c = null;
- long ts = now;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
- c = results.get(idx);
- ts = Math.max(now, c.getTimestamp());
- if(c.getValueLength() == Bytes.SIZEOF_LONG) {
- amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
- } else {
- // throw DoNotRetryIOException instead of IllegalArgumentException
- throw new org.apache.hadoop.hbase.DoNotRetryIOException(
- "Attempted to increment field that isn't 64 bits wide");
- }
- // Carry tags forward from previous version
- newTags = carryForwardTags(c, newTags);
- if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {
- idx++;
- }
- }
-
- // Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(cell);
- byte[] val = Bytes.toBytes(amount);
+ }
- // Add the TTL tag if the mutation carried one
- if (mutation.getTTL() != Long.MAX_VALUE) {
- newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));
- }
+ // Actually write to WAL now. If walEdits is non-empty, we write the WAL.
+ if (walEdits != null && !walEdits.isEmpty()) {
+ // Using default cluster id, as this can only happen in the originating cluster.
+ // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
+ // here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
+ getMVCC());
+ txid =
+ this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
+ } else {
+ // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
+ walKey = appendEmptyEdit(this.wal);
+ }
- Cell newKV = new KeyValue(row, 0, row.length,
- family.getKey(), 0, family.getKey().length,
- q, 0, q.length,
- ts,
- KeyValue.Type.Put,
- val, 0, val.length,
- newTags);
+ if (txid != 0) syncOrDefer(txid, effectiveDurability);
- // Give coprocessors a chance to update the new cell
- if (coprocessorHost != null) {
- newKV = coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.INCREMENT, mutation, c, newKV);
- }
- allKVs.add(newKV);
+ // Now write to memstore.
+ for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
+ Store store = entry.getKey();
+ List<Cell> results = entry.getValue();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // Upsert if VERSIONS for this CF == 1. Use write sequence id rather than read point
+ // when doing fast increment.
+ accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
+ } else {
+ // Otherwise keep older versions around
+ for (Cell cell: results) {
+ accumulatedResultSize += store.add(cell);
+ }
+ }
+ }
- if (!noWriteBack) {
- kvs.add(newKV);
+ // Tell mvcc this write is complete.
+ this.mvcc.complete(walKey.getWriteEntry());
+ walKey = null;
+ } finally {
+ this.updatesLock.readLock().unlock();
+ }
+ } finally {
+ // walKey is not null if above processing failed... cleanup the mvcc transaction.
+ if (walKey != null) this.mvcc.complete(walKey.getWriteEntry());
+ rowLock.release();
+ }
+ // Request a cache flush. Do it outside update lock.
+ if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
+ return increment.isReturnResults() ? Result.create(allKVs) : null;
+ }
- // Prepare WAL updates
- if (writeToWAL) {
- if (walEdits == null) {
- walEdits = new WALEdit();
- }
- walEdits.add(newKV);
- }
+ private Result slowButConsistentIncrement(Increment increment, long nonceGroup, long nonce)
+ throws IOException {
+ RowLock rowLock = null;
+ WALKey walKey = null;
+ boolean doRollBackMemstore = false;
+ long accumulatedResultSize = 0;
+ List<Cell> allKVs = new ArrayList<Cell>(increment.size());
+ List<Cell> memstoreCells = new ArrayList<Cell>();
+ Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
+ try {
+ rowLock = getRowLock(increment.getRow());
+ long txid = 0;
+ try {
+ lock(this.updatesLock.readLock());
+ try {
+ // Wait for all prior MVCC transactions to finish - while we hold the row lock
+ // (so that we are guaranteed to see the latest increment)
+ this.mvcc.await();
+ if (this.coprocessorHost != null) {
+ Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
+ if (r != null) return r;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
+ WALEdit walEdits = null;
+ // Process increments a Store/family at a time.
+ // Accumulate edits for memstore to add later after we've added to WAL.
+ Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
+ for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
+ byte [] columnFamilyName = entry.getKey();
+ List<Cell> increments = entry.getValue();
+ Store store = this.stores.get(columnFamilyName);
+ // Do increment for this store; be sure to 'sort' the increments first so increments
+ // match order in which we get back current Cells when we get.
+ List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
+ sort(increments, store.getComparator()), now,
+ MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, null);
+ if (!results.isEmpty()) {
+ forMemStore.put(store, results);
+ // Prepare WAL updates
+ if (writeToWAL) {
+ if (walEdits == null) walEdits = new WALEdit();
+ walEdits.getCells().addAll(results);
}
}
-
- //store the kvs to the temporary memstore before writing WAL
- if (!kvs.isEmpty()) {
- tempMemstore.put(store, kvs);
- }
}
-
- // Actually write to WAL now
+ // Actually write to WAL now. If walEdits is non-empty, we write the WAL.
if (walEdits != null && !walEdits.isEmpty()) {
- if (writeToWAL) {
- // Using default cluster id, as this can only happen in the originating
- // cluster. A slave cluster receives the final value (not the delta)
- // as a Put.
- // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(),
- WALKey.NO_SEQUENCE_ID,
- nonceGroup,
- nonce,
- mvcc);
- txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
- walKey, walEdits, true);
- } else {
- recordMutationWithoutWal(mutation.getFamilyCellMap());
- }
- }
- if (walKey == null) {
+ // Using default cluster id, as this can only happen in the originating cluster.
+ // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
+ // here instead of WALKey directly to support legacy coprocessors.
+ walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
+ getMVCC());
+ txid =
+ this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
+ } else {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned
walKey = this.appendEmptyEdit(this.wal);
}
- // now start my own transaction
- writeEntry = walKey.getWriteEntry();
-
- // Actually write to Memstore now
- if (!tempMemstore.isEmpty()) {
- for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
- Store store = entry.getKey();
- if (store.getFamily().getMaxVersions() == 1) {
- // upsert if VERSIONS for this CF == 1
- // Is this right? It immediately becomes visible? St.Ack 20150907
- size += store.upsert(entry.getValue(), getSmallestReadPoint());
- } else {
- // otherwise keep older versions around
- for (Cell cell : entry.getValue()) {
- CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
- size += store.add(cell);
- doRollBackMemstore = true;
- }
+ // Now write to memstore, a family at a time.
+ for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
+ Store store = entry.getKey();
+ List<Cell> results = entry.getValue();
+ if (store.getFamily().getMaxVersions() == 1) {
+ // Upsert if VERSIONS for this CF == 1
+ accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
+ // TODO: St.Ack 20151222 Why no rollback in this case?
+ } else {
+ // Otherwise keep older versions around
+ for (Cell cell: results) {
+ accumulatedResultSize += store.add(cell);
+ doRollBackMemstore = true;
}
}
- size = this.addAndGetGlobalMemstoreSize(size);
- flush = isFlushSize(size);
}
} finally {
this.updatesLock.readLock().unlock();
@@ -7539,34 +7572,165 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rowLock = null;
}
// sync the transaction log outside the rowlock
- if(txid != 0){
+ if(txid != 0) {
syncOrDefer(txid, durability);
}
+ mvcc.completeAndWait(walKey.getWriteEntry());
+ walKey = null;
doRollBackMemstore = false;
} finally {
if (rowLock != null) {
rowLock.release();
}
// if the wal sync was unsuccessful, remove keys from memstore
- if (doRollBackMemstore) {
- for(List<Cell> cells: tempMemstore.values()) {
- rollbackMemstore(cells);
- }
- if (writeEntry != null) mvcc.complete(writeEntry);
- } else if (writeEntry != null) {
- mvcc.completeAndWait(writeEntry);
- }
+ if (doRollBackMemstore) rollbackMemstore(memstoreCells);
+ if (walKey != null) mvcc.complete(walKey.getWriteEntry());
closeRegionOperation(Operation.INCREMENT);
if (this.metricsRegion != null) {
this.metricsRegion.updateIncrement();
}
}
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
+ // Request a cache flush. Do it outside update lock.
+ if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
+ return increment.isReturnResults() ? Result.create(allKVs) : null;
+ }
+
+ /**
+ * @return Sorted list of <code>cells</code> using <code>comparator</code>
+ */
+ private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
+ Collections.sort(cells, comparator);
+ return cells;
+ }
+
+ /**
+ * Apply increments to a column family.
+ * @param sortedIncrements The passed in increments to apply MUST be sorted so that they match
+ * the order that they appear in the Get results (get results will be sorted on return).
+ * Otherwise, we won't be able to find the existing values if the cells are not specified in
+ * order by the client since cells are in an array list.
+ * @islation Isolation level to use when running the 'get'. Pass null for default.
+ * @return Resulting increments after <code>sortedIncrements</code> have been applied to current
+ * values (if any -- else passed increment is the final result).
+ * @throws IOException
+ */
+ private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
+ List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
+ final IsolationLevel isolation)
+ throws IOException {
+ List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
+ byte [] row = increment.getRow();
+ // Get previous values for all columns in this family
+ List<Cell> currentValues =
+ getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
+ // Iterate the input columns and update existing values if they were found, otherwise
+ // add new column initialized to the increment amount
+ int idx = 0;
+ for (int i = 0; i < sortedIncrements.size(); i++) {
+ Cell inc = sortedIncrements.get(i);
+ long incrementAmount = getLongValue(inc);
+ // If increment amount == 0, then don't write this Increment to the WAL.
+ boolean writeBack = (incrementAmount != 0);
+ // Carry forward any tags that might have been added by a coprocessor.
+ List<Tag> tags = Tag.carryForwardTags(inc);
+
+ Cell currentValue = null;
+ long ts = now;
+ if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
+ currentValue = currentValues.get(idx);
+ ts = Math.max(now, currentValue.getTimestamp());
+ incrementAmount += getLongValue(currentValue);
+ // Carry forward all tags
+ tags = Tag.carryForwardTags(tags, currentValue);
+ if (i < (sortedIncrements.size() - 1) &&
+ !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
+ }
+
+ // Append new incremented KeyValue to list
+ byte [] qualifier = CellUtil.cloneQualifier(inc);
+ byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
+ tags = carryForwardTTLTag(tags, increment);
+
+ Cell newValue = new KeyValue(row, 0, row.length,
+ columnFamilyName, 0, columnFamilyName.length,
+ qualifier, 0, qualifier.length,
+ ts, KeyValue.Type.Put,
+ incrementAmountInBytes, 0, incrementAmountInBytes.length,
+ tags);
+
+ // Don't set an mvcc if none specified. The mvcc may be assigned later in case where we
+ // write the memstore AFTER we sync our edit to the log.
+ if (mvccNum != MultiVersionConcurrencyControl.NO_WRITE_NUMBER) {
+ CellUtil.setSequenceId(newValue, mvccNum);
+ }
+
+ // Give coprocessors a chance to update the new cell
+ if (coprocessorHost != null) {
+ newValue = coprocessorHost.postMutationBeforeWAL(
+ RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
+ }
+ allKVs.add(newValue);
+ if (writeBack) {
+ results.add(newValue);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * @return Get the long out of the passed in Cell
+ * @throws DoNotRetryIOException
+ */
+ private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
+ int len = cell.getValueLength();
+ if (len != Bytes.SIZEOF_LONG) {
+ // throw DoNotRetryIOException instead of IllegalArgumentException
+ throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
}
- return mutation.isReturnResults() ? Result.create(allKVs) : null;
+ return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
+ }
+
+ /**
+ * Do a specific Get on passed <code>columnFamily</code> and column qualifiers
+ * from <code>incrementCoordinates</code> only.
+ * @param increment
+ * @param columnFamily
+ * @param incrementCoordinates
+ * @return Return the Cells to Increment
+ * @throws IOException
+ */
+ private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
+ final List<Cell> increments, final IsolationLevel isolation)
+ throws IOException {
+ Get get = new Get(increment.getRow());
+ if (isolation != null) get.setIsolationLevel(isolation);
+ for (Cell cell: increments) {
+ get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
+ }
+ TimeRange tr = increment.getTimeRange();
+ get.setTimeRange(tr.getMin(), tr.getMax());
+ return get(get, false);
+ }
+
+ private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
+ return carryForwardTTLTag(null, mutation);
+ }
+
+ /**
+ * @return Carry forward the TTL tag if the increment is carrying one
+ */
+ private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
+ final Mutation mutation) {
+ long ttl = mutation.getTTL();
+ if (ttl == Long.MAX_VALUE) return tagsOrNull;
+ List<Tag> tags = tagsOrNull;
+ // If we are making the array in here, given we are the last thing checked, we'll be only thing
+ // in the array so set its size to '1' (I saw this being done in earlier version of
+ // tag-handling).
+ if (tags == null) tags = new ArrayList<Tag>(1);
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ return tags;
}
//
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index eba99e0..da9c57a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
@InterfaceAudience.Private
public class MultiVersionConcurrencyControl {
private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class);
+ static final long NO_WRITE_NUMBER = 0;
final AtomicLong readPoint = new AtomicLong(0);
final AtomicLong writePoint = new AtomicLong(0);
@@ -155,7 +156,7 @@ public class MultiVersionConcurrencyControl {
* changes completely) so we can clean up the outstanding transaction.
*
* How much is the read point advanced?
- *
+ *
* Let S be the set of all write numbers that are completed. Set the read point to the highest
* numbered write of S.
*
@@ -279,4 +280,4 @@ public class MultiVersionConcurrencyControl {
ClassSize.OBJECT +
2 * Bytes.SIZEOF_LONG +
2 * ClassSize.REFERENCE);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 9ae72e6..e189a30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -281,8 +281,6 @@ public class FSHLog implements WAL {
private final int slowSyncNs;
- private final static Object [] NO_ARGS = new Object []{};
-
// If live datanode count is lower than the default replicas value,
// RollWriter will be triggered in each sync(So the RollWriter will be
// triggered one by one in a short time). Using it as a workaround to slow
@@ -509,16 +507,16 @@ public class FSHLog implements WAL {
FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
this.logrollsize =
(long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
-
+
float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY,
- conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY,
+ conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY,
HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
if(maxLogsDefined){
LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
}
- this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
- Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
+ this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
+ Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
this.lowReplicationRollLimit =
@@ -573,7 +571,7 @@ public class FSHLog implements WAL {
int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
return maxLogs;
}
-
+
/**
* Get the backing files associated with this WAL.
* @return may be null if there are no files.
@@ -1086,8 +1084,6 @@ public class FSHLog implements WAL {
long sequence = this.disruptor.getRingBuffer().next();
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
- // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
- // edit with its edit/sequence id.
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
new file mode 100644
index 0000000..bf3a44f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+// import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+import com.yammer.metrics.stats.Snapshot;
+
+/**
+ * Simple Increments Performance Test. Run this from main. It is to go against a cluster.
+ * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181,
+ * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by
+ * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as
+ * in -DtableName="newTableName". It prints out configuration it is running with at the start and
+ * on the end it prints out percentiles.
+ */
+public class IncrementPerformanceTest implements Tool {
+ private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class);
+ private static final byte [] QUALIFIER = new byte [] {'q'};
+ private Configuration conf;
+ private final MetricName metricName = new MetricName(this.getClass(), "increment");
+ private static final String TABLENAME = "tableName";
+ private static final String COLUMN_FAMILY = "columnFamilyName";
+ private static final String THREAD_COUNT = "threadCount";
+ private static final int DEFAULT_THREAD_COUNT = 80;
+ private static final String INCREMENT_COUNT = "incrementCount";
+ private static final int DEFAULT_INCREMENT_COUNT = 10000;
+
+ IncrementPerformanceTest() {}
+
+ public int run(final String [] args) throws Exception {
+ Configuration conf = getConf();
+ final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME);
+ final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY));
+ int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT);
+ final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT);
+ LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" +
+ getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName +
+ ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount +
+ ", incrementCount=" + incrementCount);
+
+ ExecutorService service = Executors.newFixedThreadPool(threadCount);
+ Set<Future<?>> futures = new HashSet<Future<?>>();
+ final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter
+ while (integer.incrementAndGet() <= threadCount) {
+ futures.add(service.submit(new Runnable() {
+ @Override
+ public void run() {
+ HTable table;
+ try {
+ // ConnectionFactory.createConnection(conf).getTable(TableName.valueOf(TABLE_NAME));
+ table = new HTable(getConf(), tableName.getName());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ Timer timer = Metrics.newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+ for (int i = 0; i < incrementCount; i++) {
+ byte[] row = Bytes.toBytes(i);
+ TimerContext context = timer.time();
+ try {
+ table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l);
+ } catch (IOException e) {
+ // swallow..it's a test.
+ } finally {
+ context.stop();
+ }
+ }
+ }
+ }));
+ }
+
+ for(Future<?> future : futures) future.get();
+ service.shutdown();
+ Snapshot s = Metrics.newTimer(this.metricName,
+ TimeUnit.MILLISECONDS, TimeUnit.SECONDS).getSnapshot();
+ LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(),
+ s.get95thPercentile(), s.get99thPercentile()));
+ return 0;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 81253a5..28c354f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -3139,7 +3138,7 @@ public class TestFromClientSide {
equals(value, CellUtil.cloneValue(key)));
}
- private void assertIncrementKey(Cell key, byte [] row, byte [] family,
+ static void assertIncrementKey(Cell key, byte [] row, byte [] family,
byte [] qualifier, long value)
throws Exception {
assertTrue("Expected row [" + Bytes.toString(row) + "] " +
@@ -3363,7 +3362,7 @@ public class TestFromClientSide {
return stamps;
}
- private boolean equals(byte [] left, byte [] right) {
+ static boolean equals(byte [] left, byte [] right) {
if (left == null && right == null) return true;
if (left == null && right.length == 0) return true;
if (right == null && left.length == 0) return true;
@@ -4483,264 +4482,6 @@ public class TestFromClientSide {
}
@Test
- public void testIncrementWithDeletes() throws Exception {
- LOG.info("Starting testIncrementWithDeletes");
- final TableName TABLENAME =
- TableName.valueOf("testIncrementWithDeletes");
- Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
- final byte[] COLUMN = Bytes.toBytes("column");
-
- ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
- TEST_UTIL.flush(TABLENAME);
-
- Delete del = new Delete(ROW);
- ht.delete(del);
-
- ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
-
- Get get = new Get(ROW);
- Result r = ht.get(get);
- assertEquals(1, r.size());
- assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN)));
- }
-
- @Test
- public void testIncrementingInvalidValue() throws Exception {
- LOG.info("Starting testIncrementingInvalidValue");
- final TableName TABLENAME = TableName.valueOf("testIncrementingInvalidValue");
- Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
- final byte[] COLUMN = Bytes.toBytes("column");
- Put p = new Put(ROW);
- // write an integer here (not a Long)
- p.add(FAMILY, COLUMN, Bytes.toBytes(5));
- ht.put(p);
- try {
- ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
- fail("Should have thrown DoNotRetryIOException");
- } catch (DoNotRetryIOException iox) {
- // success
- }
- Increment inc = new Increment(ROW);
- inc.addColumn(FAMILY, COLUMN, 5);
- try {
- ht.increment(inc);
- fail("Should have thrown DoNotRetryIOException");
- } catch (DoNotRetryIOException iox) {
- // success
- }
- }
-
- @Test
- public void testIncrementInvalidArguments() throws Exception {
- LOG.info("Starting testIncrementInvalidArguments");
- final TableName TABLENAME = TableName.valueOf("testIncrementInvalidArguments");
- Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
- final byte[] COLUMN = Bytes.toBytes("column");
- try {
- // try null row
- ht.incrementColumnValue(null, FAMILY, COLUMN, 5);
- fail("Should have thrown IOException");
- } catch (IOException iox) {
- // success
- }
- try {
- // try null family
- ht.incrementColumnValue(ROW, null, COLUMN, 5);
- fail("Should have thrown IOException");
- } catch (IOException iox) {
- // success
- }
- try {
- // try null qualifier
- ht.incrementColumnValue(ROW, FAMILY, null, 5);
- fail("Should have thrown IOException");
- } catch (IOException iox) {
- // success
- }
- // try null row
- try {
- Increment incNoRow = new Increment((byte [])null);
- incNoRow.addColumn(FAMILY, COLUMN, 5);
- fail("Should have thrown IllegalArgumentException");
- } catch (IllegalArgumentException iax) {
- // success
- } catch (NullPointerException npe) {
- // success
- }
- // try null family
- try {
- Increment incNoFamily = new Increment(ROW);
- incNoFamily.addColumn(null, COLUMN, 5);
- fail("Should have thrown IllegalArgumentException");
- } catch (IllegalArgumentException iax) {
- // success
- }
- // try null qualifier
- try {
- Increment incNoQualifier = new Increment(ROW);
- incNoQualifier.addColumn(FAMILY, null, 5);
- fail("Should have thrown IllegalArgumentException");
- } catch (IllegalArgumentException iax) {
- // success
- }
- }
-
- @Test
- public void testIncrementOutOfOrder() throws Exception {
- LOG.info("Starting testIncrementOutOfOrder");
- final TableName TABLENAME = TableName.valueOf("testIncrementOutOfOrder");
- Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
-
- byte [][] QUALIFIERS = new byte [][] {
- Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C")
- };
-
- Increment inc = new Increment(ROW);
- for (int i=0; i<QUALIFIERS.length; i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- }
- ht.increment(inc);
-
- // Verify expected results
- Result r = ht.get(new Get(ROW));
- Cell [] kvs = r.rawCells();
- assertEquals(3, kvs.length);
- assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1);
- assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1);
- assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1);
-
- // Now try multiple columns again
- inc = new Increment(ROW);
- for (int i=0; i<QUALIFIERS.length; i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- }
- ht.increment(inc);
-
- // Verify
- r = ht.get(new Get(ROW));
- kvs = r.rawCells();
- assertEquals(3, kvs.length);
- assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2);
- assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2);
- assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2);
- }
-
- @Test
- public void testIncrementOnSameColumn() throws Exception {
- LOG.info("Starting testIncrementOnSameColumn");
- final byte[] TABLENAME = Bytes.toBytes("testIncrementOnSameColumn");
- HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
-
- byte[][] QUALIFIERS =
- new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
-
- Increment inc = new Increment(ROW);
- for (int i = 0; i < QUALIFIERS.length; i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- }
- ht.increment(inc);
-
- // Verify expected results
- Result r = ht.get(new Get(ROW));
- Cell[] kvs = r.rawCells();
- assertEquals(3, kvs.length);
- assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
- assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1);
- assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1);
-
- // Now try multiple columns again
- inc = new Increment(ROW);
- for (int i = 0; i < QUALIFIERS.length; i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- inc.addColumn(FAMILY, QUALIFIERS[i], 1);
- }
- ht.increment(inc);
-
- // Verify
- r = ht.get(new Get(ROW));
- kvs = r.rawCells();
- assertEquals(3, kvs.length);
- assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2);
- assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2);
- assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2);
-
- ht.close();
- }
-
- @Test
- public void testIncrement() throws Exception {
- LOG.info("Starting testIncrement");
- final TableName TABLENAME = TableName.valueOf("testIncrement");
- Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
-
- byte [][] ROWS = new byte [][] {
- Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
- Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
- Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
- };
- byte [][] QUALIFIERS = new byte [][] {
- Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
- Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
- Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
- };
-
- // Do some simple single-column increments
-
- // First with old API
- ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1);
- ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2);
- ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3);
- ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4);
-
- // Now increment things incremented with old and do some new
- Increment inc = new Increment(ROW);
- inc.addColumn(FAMILY, QUALIFIERS[1], 1);
- inc.addColumn(FAMILY, QUALIFIERS[3], 1);
- inc.addColumn(FAMILY, QUALIFIERS[4], 1);
- ht.increment(inc);
-
- // Verify expected results
- Result r = ht.get(new Get(ROW));
- Cell [] kvs = r.rawCells();
- assertEquals(5, kvs.length);
- assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
- assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3);
- assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3);
- assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5);
- assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1);
-
- // Now try multiple columns by different amounts
- inc = new Increment(ROWS[0]);
- for (int i=0;i<QUALIFIERS.length;i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
- }
- ht.increment(inc);
- // Verify
- r = ht.get(new Get(ROWS[0]));
- kvs = r.rawCells();
- assertEquals(QUALIFIERS.length, kvs.length);
- for (int i=0;i<QUALIFIERS.length;i++) {
- assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1);
- }
-
- // Re-increment them
- inc = new Increment(ROWS[0]);
- for (int i=0;i<QUALIFIERS.length;i++) {
- inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
- }
- ht.increment(inc);
- // Verify
- r = ht.get(new Get(ROWS[0]));
- kvs = r.rawCells();
- assertEquals(QUALIFIERS.length, kvs.length);
- for (int i=0;i<QUALIFIERS.length;i++) {
- assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
- }
- }
-
-
- @Test
public void testClientPoolRoundRobin() throws IOException {
final TableName tableName = TableName.valueOf("testClientPoolRoundRobin");
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index a0a8747..09c7e86 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.util.ArrayList;
@@ -32,14 +31,14 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java
index ae96849..66fb69c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideNoCodec.java
@@ -99,4 +99,4 @@ public class TestFromClientSideNoCodec {
String codec = AbstractRpcClient.getDefaultCodec(c);
assertTrue(codec == null || codec.length() == 0);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
index 2671af7..cd2409e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
@@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category;
/**
* Test all client operations with a coprocessor that
- * just implements the default flush/compact/scan policy
+ * just implements the default flush/compact/scan policy.
*/
@Category(LargeTests.class)
public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java
new file mode 100644
index 0000000..a67cc45
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementFromClientSideWithCoprocessor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test all {@link Increment} client operations with a coprocessor that
+ * just implements the default flush/compact/scan policy.
+ *
+ * This test takes a long time. The test it derives from is parameterized so we run through both
+ * options of the test.
+ */
+@Category(LargeTests.class)
+public class TestIncrementFromClientSideWithCoprocessor extends TestIncrementsFromClientSide {
+ public TestIncrementFromClientSideWithCoprocessor(final boolean fast) {
+ super(fast);
+ }
+
+ @Before
+ public void before() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName());
+ conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
+ super.before();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
new file mode 100644
index 0000000..54a54a0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -0,0 +1,433 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run Increment tests that use the HBase clients; {@link HTable}.
+ *
+ * Test is parameterized to run the slow and fast increment code paths. If fast, in the @before, we
+ * do a rolling restart of the single regionserver so that it can pick up the go fast configuration.
+ * Doing it this way should be faster than starting/stopping a cluster per test.
+ *
+ * Test takes a long time because spin up a cluster between each run -- ugh.
+ */
+@RunWith(Parameterized.class)
+@Category(LargeTests.class)
+@SuppressWarnings ("deprecation")
+public class TestIncrementsFromClientSide {
+ final Log LOG = LogFactory.getLog(getClass());
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static byte [] ROW = Bytes.toBytes("testRow");
+ private static byte [] FAMILY = Bytes.toBytes("testFamily");
+ // This test depends on there being only one slave running at at a time. See the @Before
+ // method where we do rolling restart.
+ protected static int SLAVES = 1;
+ private String oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY;
+ @Rule public TestName name = new TestName();
+ @Parameters(name = "fast={0}")
+ public static Collection<Object []> data() {
+ return Arrays.asList(new Object[] {Boolean.FALSE}, new Object [] {Boolean.TRUE});
+ }
+ private final boolean fast;
+
+ public TestIncrementsFromClientSide(final boolean fast) {
+ this.fast = fast;
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ MultiRowMutationEndpoint.class.getName());
+ conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
+ // We need more than one region server in this test
+ TEST_UTIL.startMiniCluster(SLAVES);
+ }
+
+ @Before
+ public void before() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ if (this.fast) {
+ // If fast is set, set our configuration and then do a rolling restart of the one
+ // regionserver so it picks up the new config. Doing this should be faster than starting
+ // and stopping a cluster for each test.
+ this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY =
+ conf.get(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY);
+ conf.setBoolean(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY, this.fast);
+ HRegionServer rs =
+ TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0).getRegionServer();
+ TEST_UTIL.getHBaseCluster().startRegionServer();
+ rs.stop("Restart");
+ while(!rs.isStopped()) {
+ Threads.sleep(100);
+ LOG.info("Restarting " + rs);
+ }
+ TEST_UTIL.waitUntilNoRegionsInTransition(10000);
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ if (this.fast) {
+ if (this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY != null) {
+ conf.set(HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY,
+ this.oldINCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY);
+ }
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testIncrementWithDeletes() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final TableName TABLENAME =
+ TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName()));
+ Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+ final byte[] COLUMN = Bytes.toBytes("column");
+
+ ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
+ TEST_UTIL.flush(TABLENAME);
+
+ Delete del = new Delete(ROW);
+ ht.delete(del);
+
+ ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
+
+ Get get = new Get(ROW);
+ if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ Result r = ht.get(get);
+ assertEquals(1, r.size());
+ assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN)));
+ }
+
+ @Test
+ public void testIncrementingInvalidValue() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final TableName TABLENAME =
+ TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName()));
+ Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+ final byte[] COLUMN = Bytes.toBytes("column");
+ Put p = new Put(ROW);
+ // write an integer here (not a Long)
+ p.add(FAMILY, COLUMN, Bytes.toBytes(5));
+ ht.put(p);
+ try {
+ ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
+ fail("Should have thrown DoNotRetryIOException");
+ } catch (DoNotRetryIOException iox) {
+ // success
+ }
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, COLUMN, 5);
+ try {
+ ht.increment(inc);
+ fail("Should have thrown DoNotRetryIOException");
+ } catch (DoNotRetryIOException iox) {
+ // success
+ }
+ }
+
+ @Test
+ public void testIncrementInvalidArguments() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final TableName TABLENAME =
+ TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName()));
+ Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+ final byte[] COLUMN = Bytes.toBytes("column");
+ try {
+ // try null row
+ ht.incrementColumnValue(null, FAMILY, COLUMN, 5);
+ fail("Should have thrown IOException");
+ } catch (IOException iox) {
+ // success
+ }
+ try {
+ // try null family
+ ht.incrementColumnValue(ROW, null, COLUMN, 5);
+ fail("Should have thrown IOException");
+ } catch (IOException iox) {
+ // success
+ }
+ try {
+ // try null qualifier
+ ht.incrementColumnValue(ROW, FAMILY, null, 5);
+ fail("Should have thrown IOException");
+ } catch (IOException iox) {
+ // success
+ }
+ // try null row
+ try {
+ Increment incNoRow = new Increment((byte [])null);
+ incNoRow.addColumn(FAMILY, COLUMN, 5);
+ fail("Should have thrown IllegalArgumentException");
+ } catch (IllegalArgumentException iax) {
+ // success
+ } catch (NullPointerException npe) {
+ // success
+ }
+ // try null family
+ try {
+ Increment incNoFamily = new Increment(ROW);
+ incNoFamily.addColumn(null, COLUMN, 5);
+ fail("Should have thrown IllegalArgumentException");
+ } catch (IllegalArgumentException iax) {
+ // success
+ }
+ // try null qualifier
+ try {
+ Increment incNoQualifier = new Increment(ROW);
+ incNoQualifier.addColumn(FAMILY, null, 5);
+ fail("Should have thrown IllegalArgumentException");
+ } catch (IllegalArgumentException iax) {
+ // success
+ }
+ }
+
+ @Test
+ public void testIncrementOutOfOrder() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final TableName TABLENAME =
+ TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName()));
+ Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+
+ byte [][] QUALIFIERS = new byte [][] {
+ Bytes.toBytes("B"), Bytes.toBytes("A"), Bytes.toBytes("C")
+ };
+
+ Increment inc = new Increment(ROW);
+ for (int i=0; i<QUALIFIERS.length; i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ }
+ ht.increment(inc);
+
+ // Verify expected results
+ Get get = new Get(ROW);
+ if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ Result r = ht.get(get);
+ Cell [] kvs = r.rawCells();
+ assertEquals(3, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 1);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 1);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1);
+
+ // Now try multiple columns again
+ inc = new Increment(ROW);
+ for (int i=0; i<QUALIFIERS.length; i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ }
+ ht.increment(inc);
+
+ // Verify
+ r = ht.get(get);
+ kvs = r.rawCells();
+ assertEquals(3, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[1], 2);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[0], 2);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2);
+ }
+
+ @Test
+ public void testIncrementOnSameColumn() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final byte[] TABLENAME = Bytes.toBytes(filterStringSoTableNameSafe(this.name.getMethodName()));
+ HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+
+ byte[][] QUALIFIERS =
+ new byte[][] { Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
+
+ Increment inc = new Increment(ROW);
+ for (int i = 0; i < QUALIFIERS.length; i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ }
+ ht.increment(inc);
+
+ // Verify expected results
+ Get get = new Get(ROW);
+ if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ Result r = ht.get(get);
+ Cell[] kvs = r.rawCells();
+ assertEquals(3, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 1);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 1);
+
+ // Now try multiple columns again
+ inc = new Increment(ROW);
+ for (int i = 0; i < QUALIFIERS.length; i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[i], 1);
+ }
+ ht.increment(inc);
+
+ // Verify
+ r = ht.get(get);
+ kvs = r.rawCells();
+ assertEquals(3, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 2);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 2);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 2);
+
+ ht.close();
+ }
+
+ @Test
+ public void testIncrement() throws Exception {
+ LOG.info("Starting " + this.name.getMethodName());
+ final TableName TABLENAME =
+ TableName.valueOf(filterStringSoTableNameSafe(this.name.getMethodName()));
+ Table ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
+
+ byte [][] ROWS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+ Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+ Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+ };
+ byte [][] QUALIFIERS = new byte [][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+ Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
+ Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i")
+ };
+
+ // Do some simple single-column increments
+
+ // First with old API
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[0], 1);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[1], 2);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[2], 3);
+ ht.incrementColumnValue(ROW, FAMILY, QUALIFIERS[3], 4);
+
+ // Now increment things incremented with old and do some new
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, QUALIFIERS[1], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[3], 1);
+ inc.addColumn(FAMILY, QUALIFIERS[4], 1);
+ ht.increment(inc);
+
+ // Verify expected results
+ Get get = new Get(ROW);
+ if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ Result r = ht.get(get);
+ Cell [] kvs = r.rawCells();
+ assertEquals(5, kvs.length);
+ assertIncrementKey(kvs[0], ROW, FAMILY, QUALIFIERS[0], 1);
+ assertIncrementKey(kvs[1], ROW, FAMILY, QUALIFIERS[1], 3);
+ assertIncrementKey(kvs[2], ROW, FAMILY, QUALIFIERS[2], 3);
+ assertIncrementKey(kvs[3], ROW, FAMILY, QUALIFIERS[3], 5);
+ assertIncrementKey(kvs[4], ROW, FAMILY, QUALIFIERS[4], 1);
+
+ // Now try multiple columns by different amounts
+ inc = new Increment(ROWS[0]);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+ }
+ ht.increment(inc);
+ // Verify
+ get = new Get(ROWS[0]);
+ if (this.fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ r = ht.get(get);
+ kvs = r.rawCells();
+ assertEquals(QUALIFIERS.length, kvs.length);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], i+1);
+ }
+
+ // Re-increment them
+ inc = new Increment(ROWS[0]);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], i+1);
+ }
+ ht.increment(inc);
+ // Verify
+ r = ht.get(get);
+ kvs = r.rawCells();
+ assertEquals(QUALIFIERS.length, kvs.length);
+ for (int i=0;i<QUALIFIERS.length;i++) {
+ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
+ }
+
+ // Verify that an Increment of an amount of zero, returns current count; i.e. same as for above
+ // test, that is: 2 * (i + 1).
+ inc = new Increment(ROWS[0]);
+ for (int i = 0; i < QUALIFIERS.length; i++) {
+ inc.addColumn(FAMILY, QUALIFIERS[i], 0);
+ }
+ ht.increment(inc);
+ r = ht.get(get);
+ kvs = r.rawCells();
+ assertEquals(QUALIFIERS.length, kvs.length);
+ for (int i = 0; i < QUALIFIERS.length; i++) {
+ assertIncrementKey(kvs[i], ROWS[0], FAMILY, QUALIFIERS[i], 2*(i+1));
+ }
+ }
+
+
+ /**
+ * Call over to the adjacent class's method of same name.
+ */
+ static void assertIncrementKey(Cell key, byte [] row, byte [] family,
+ byte [] qualifier, long value) throws Exception {
+ TestFromClientSide.assertIncrementKey(key, row, family, qualifier, value);
+ }
+
+ public static String filterStringSoTableNameSafe(final String str) {
+ return str.replaceAll("\\[fast\\=(.*)\\]", ".FAST.is.$1");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/930f68c0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
index 8e451cd..ab53e3e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -237,7 +235,8 @@ public abstract class TestTableInputFormatScanBase {
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
job.setReducerClass(ScanReducer.class);
job.setNumReduceTasks(1); // one to get final "first" and "last" key
- FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ FileOutputFormat.setOutputPath(job,
+ new Path(TEST_UTIL.getDataTestDir(), job.getJobName()));
LOG.info("Started " + job.getJobName());
assertTrue(job.waitForCompletion(true));
LOG.info("After map/reduce completion - job " + jobName);