You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/08/01 00:08:25 UTC
[49/50] [abbrv] git commit: Revert "[HBASE-11233] Asynchronous
multiputs"
Revert "[HBASE-11233] Asynchronous multiputs"
Summary: This reverts commit fb2b5adfb05266cb77619c1544d9d6463feabcf0.
Test Plan: simple revert.
Reviewers: rshroff
Reviewed By: rshroff
Subscribers: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1408564
Tasks: 4584691
git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/titan/VENDOR.hbase/hbase-trunk@43193 e7acf4d4-3532-417f-9e73-7a9ae25a1f51
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da748d43
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da748d43
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da748d43
Branch: refs/heads/0.89-fb
Commit: da748d43de8f67edabc7b5a12729099a2bef8fb6
Parents: 5808b33
Author: manukranthk <ma...@e7acf4d4-3532-417f-9e73-7a9ae25a1f51>
Authored: Fri Jun 27 19:07:43 2014 +0000
Committer: Elliott Clark <el...@fb.com>
Committed: Thu Jul 31 14:52:44 2014 -0700
----------------------------------------------------------------------
.../hbase/master/TimeToLiveLogCleaner.java | 5 +-
.../hadoop/hbase/regionserver/HRegion.java | 266 ++++++-------------
.../hbase/regionserver/HRegionServer.java | 102 +++----
.../MultiVersionConsistencyControl.java | 3 +
.../hadoop/hbase/regionserver/wal/HLog.java | 222 ++++++----------
.../hadoop/hbase/HBaseTestingUtility.java | 4 -
.../hadoop/hbase/client/TestAsyncMultiputs.java | 185 -------------
.../org/apache/hadoop/hbase/client/TestHCM.java | 9 +-
.../hadoop/hbase/master/TestOldLogsCleaner.java | 5 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 9 +-
.../wal/TestAsyncMultiputsFailingWal.java | 58 ----
.../hadoop/hbase/regionserver/wal/TestHLog.java | 52 +---
.../hbase/regionserver/wal/TestHLogSplit.java | 6 +-
.../wal/TestLogActionsListener.java | 2 +-
.../hbase/regionserver/wal/TestWALReplay.java | 9 +-
15 files changed, 229 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java b/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java
index fce153e..cc9e571 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/TimeToLiveLogCleaner.java
@@ -48,9 +48,8 @@ public class TimeToLiveLogCleaner implements LogCleanerDelegate {
long currentTime = System.currentTimeMillis();
try {
// If the path name is in hourly format, skip getting modification time
- if (OldLogsCleaner.isOldLogsArchivedToHourlyDir()
- && OldLogsCleaner.isMatchDatePattern(filePath)) {
- time = HLog.getDateFormat().parse(filePath.getName()).getTime();
+ if (OldLogsCleaner.isOldLogsArchivedToHourlyDir() && OldLogsCleaner.isMatchDatePattern(filePath)) {
+ time = HLog.DATE_FORMAT.parse(filePath.getName()).getTime();
} else {
FileStatus fStat = filePath.getFileSystem(conf).getFileStatus(filePath);
time = fStat.getModificationTime();
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 4f7b42c..2511c8e 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -56,8 +56,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.Nullable;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -108,14 +106,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@@ -1981,7 +1974,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(familyMap, walEdit);
seqNum = this.log.append(regionInfo,
- regionInfo.getTableDesc().getName(), walEdit, now).checkedGet();
+ regionInfo.getTableDesc().getName(), walEdit, now);
}
// Now make changes to the memstore.
@@ -2062,7 +2055,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
try {
// All edits for the given row (across all column families) must happen atomically.
- put(put.getFamilyMap(), writeToWAL).checkedGet();
+ put(put.getFamilyMap(), writeToWAL);
} finally {
if (lockid == null) releaseRowLock(lid);
}
@@ -2097,28 +2090,14 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
* Perform a batch put with no pre-specified locks
* @see HRegion#put(Pair[])
*/
- public CheckedFuture<OperationStatusCode[], IOException> put(Put[] puts) throws IOException {
+ public OperationStatusCode[] put(Put[] puts) throws IOException {
@SuppressWarnings("unchecked")
Pair<Mutation, Integer> putsAndLocks[] = new Pair[puts.length];
for (int i = 0; i < puts.length; i++) {
putsAndLocks[i] = new Pair<Mutation, Integer>(puts[i], null);
}
- return Futures.makeChecked(batchMutateWithLocks(putsAndLocks, "multiput_"),
- getExceptionMapperFunction());
- }
-
- public Function<Exception, IOException> getExceptionMapperFunction() {
- return new Function<Exception, IOException>() {
- @Override
- @Nullable
- public IOException apply(@Nullable Exception e) {
- if (e instanceof ExecutionException) {
- return new IOException(e.getCause());
- }
- return new IOException(e);
- }
- };
+ return batchMutateWithLocks(putsAndLocks, "multiput_");
}
/**
@@ -2127,55 +2106,33 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
* @param methodName "multiput_/multidelete_" to update metrics correctly.
* @throws IOException
*/
- public ListenableFuture<OperationStatusCode[]> batchMutateWithLocks(
- Pair<Mutation, Integer>[] putsAndLocks, String methodName)
- throws IOException {
+ public OperationStatusCode[] batchMutateWithLocks(Pair<Mutation, Integer>[] putsAndLocks,
+ String methodName) throws IOException {
BatchOperationInProgress<Pair<Mutation, Integer>> batchOp =
new BatchOperationInProgress<Pair<Mutation,Integer>>(putsAndLocks);
- return batchMutateWithLazyRetry(batchOp, methodName);
- }
- public ListenableFuture<OperationStatusCode[]> batchMutateWithLazyRetry(
- final BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
- final String methodName) throws IOException {
-
- if (!batchOp.isDone()) {
+ while (!batchOp.isDone()) {
checkReadOnly();
checkResources();
+ long newSize;
splitsAndClosesLock.readLock().lock();
try {
doPreMutationHook(batchOp);
- return Futures.transform(doMiniBatchOp(batchOp, methodName),
- new AsyncFunction<Long, OperationStatusCode[]>() {
- @Override
- public ListenableFuture<OperationStatusCode[]> apply(Long addedSize)
- throws Exception {
- splitsAndClosesLock.readLock().lock();
- try {
- long newSize = incMemoryUsage(addedSize);
- if (isFlushSize(newSize)) {
- requestFlush();
- }
- } finally {
- splitsAndClosesLock.readLock().unlock();
- }
- if (batchOp.isDone()) {
- HRegion.writeOps.incrementAndGet();
- return Futures.immediateFuture(batchOp.retCodes);
- }
- return batchMutateWithLazyRetry(batchOp, methodName);
- }
- });
- } catch (Exception e) {
- return Futures.immediateFuture(batchOp.retCodes);
+ long addedSize = doMiniBatchOp(batchOp, methodName);
+ newSize = this.incMemoryUsage(addedSize);
} finally {
splitsAndClosesLock.readLock().unlock();
}
+ if (isFlushSize(newSize)) {
+ requestFlush();
+ }
}
- return Futures.immediateFuture(batchOp.retCodes);
+ HRegion.writeOps.incrementAndGet();
+ return batchOp.retCodes;
}
+
/**
* Execute coprocessor hooks for mutations
* TODO: adela currently can work only with prePut and we need to implement the rest..
@@ -2208,26 +2165,23 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
}
}
- private ListenableFuture<Long> doMiniBatchOp(
- final BatchOperationInProgress <Pair<Mutation, Integer>> batchOp,
- final String methodNameForMetricsUpdate)
- throws IOException {
- boolean preWallEditSuccessfull = false;
+ private long doMiniBatchOp(BatchOperationInProgress<Pair<Mutation, Integer>> batchOp,
+ String methodNameForMetricsUpdate) throws IOException {
String signature = null;
// variable to note if all Put items are for the same CF -- metrics related
boolean isSignatureClear = true;
- final long now = EnvironmentEdgeManager.currentTimeMillis();
+ long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
boolean locked = false;
/** Keep track of the locks we hold so we can release them in finally clause */
- final List<Integer> acquiredLocks =
- Lists.newArrayListWithCapacity(batchOp.operations.length);
+ List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
- final int firstIndex = batchOp.nextIndexToProcess;
+ int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
+ boolean success = false;
try {
// ------------------------------------
// STEP 1. Try to acquire as many locks as we can, and ensure
@@ -2239,8 +2193,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
Integer currentLockID = null;
while (lastIndexExclusive < batchOp.operations.length) {
- Pair<Mutation, Integer> nextPair =
- batchOp.operations[lastIndexExclusive];
+ Pair<Mutation, Integer> nextPair = batchOp.operations[lastIndexExclusive];
Mutation op = nextPair.getFirst();
Integer providedLockId = nextPair.getSecond();
@@ -2291,9 +2244,6 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
}
}
}
-
- final int finalLastIndexExclusive = lastIndexExclusive;
-
// We've now grabbed as many puts off the list as we can
assert numReadyToWrite > 0;
@@ -2303,7 +2253,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
// ------------------------------------
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// ----------------------------------
- for (int i = firstIndex; i < finalLastIndexExclusive; i++) {
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
Mutation op = batchOp.operations[i].getFirst();
if (op instanceof Put) {
@@ -2317,14 +2267,13 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
}
}
- ListenableFuture<Long> wallEditFuture =
- Futures.immediateFuture(-1L);
+ long seqNum = -1;
// ------------------------------------
// STEP 3. Write to WAL
// ----------------------------------
if (!this.disableWAL) {
WALEdit walEdit = new WALEdit();
- for (int i = firstIndex; i < finalLastIndexExclusive; i++) {
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
// Skip puts that were determined to be invalid during preprocessing
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
@@ -2334,80 +2283,48 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
}
// Append the edit to WAL
- wallEditFuture = this.log.append(regionInfo,
- regionInfo.getTableDesc().getName(),
- walEdit, now);
+ seqNum = this.log.append(regionInfo,
+ regionInfo.getTableDesc().getName(),
+ walEdit, now);
}
- final String finalSignature = signature;
-
- preWallEditSuccessfull = true;
-
- return Futures.transform(wallEditFuture, new AsyncFunction<Long, Long>() {
- @Override
- public ListenableFuture<Long> apply(Long input) throws Exception {
- boolean memstoreUpdatesSuccessful = false;
- updatesLock.readLock().lock();
- try {
- // ------------------------------------
- // STEP 4. Write back to memstore
- // ----------------------------------
-
- long addedSize = 0;
- for (int i = firstIndex; i < finalLastIndexExclusive; i++) {
- if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
-
- Mutation op = batchOp.operations[i].getFirst();
- addedSize += applyFamilyMapToMemstore(op.getFamilyMap(), input);
- batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
- }
- memstoreUpdatesSuccessful = true;
- return Futures.immediateFuture(addedSize);
- } finally {
- releaseLocksAndUpdateMetrics(updatesLock, acquiredLocks,
- finalSignature, methodNameForMetricsUpdate, firstIndex,
- finalLastIndexExclusive, batchOp, true,
- memstoreUpdatesSuccessful, now);
- }
- }
- });
- } catch (Throwable t) {
- return Futures.immediateFuture(null);
- } finally {
- releaseLocksAndUpdateMetrics(updatesLock,
- Collections.<Integer> emptyList(), signature,
- methodNameForMetricsUpdate, firstIndex, lastIndexExclusive, batchOp,
- locked, preWallEditSuccessfull, now);
- }
- }
- /**
- * This method was intended for code de-duplication and nothing else.
- */
- private void releaseLocksAndUpdateMetrics(ReentrantReadWriteLock updatedLock,
- List<Integer> acquiredLocks, String signature,
- String methodNameForMetricsUpdate, int firstIndex, int lastIndexExclusive,
- BatchOperationInProgress<Pair<Mutation, Integer>> batchOp, boolean locked,
- boolean preWallEditSuccessfull, long now) {
- if (locked) {
- this.updatesLock.readLock().unlock();
- }
- releaseRowLocks(acquiredLocks);
- // do after lock
- long after = EnvironmentEdgeManager.currentTimeMillis();
- if (null == signature) {
- signature = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
- }
- HRegion.incrTimeVaryingMetric(
- signature + methodNameForMetricsUpdate, after - now);
+ // ------------------------------------
+ // STEP 4. Write back to memstore
+ // ----------------------------------
- if (!preWallEditSuccessfull) {
+ long addedSize = 0;
for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
- batchOp.retCodes[i] = OperationStatusCode.FAILURE;
+ if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
+
+ Mutation op = batchOp.operations[i].getFirst();
+ addedSize += applyFamilyMapToMemstore(op.getFamilyMap(), seqNum);
+ batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
+ }
+ success = true;
+ return addedSize;
+ } finally {
+ if (locked) {
+ this.updatesLock.readLock().unlock();
+ }
+
+ releaseRowLocks(acquiredLocks);
+
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ if (null == signature) {
+ signature = SchemaMetrics.CF_BAD_FAMILY_PREFIX;
+ }
+ HRegion.incrTimeVaryingMetric(signature + methodNameForMetricsUpdate, after - now);
+
+ if (!success) {
+ for (int i = firstIndex; i < lastIndexExclusive; i++) {
+ if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
+ batchOp.retCodes[i] = OperationStatusCode.FAILURE;
+ }
}
}
+ batchOp.nextIndexToProcess = lastIndexExclusive;
}
- batchOp.nextIndexToProcess = lastIndexExclusive;
}
//TODO, Think that gets/puts and deletes should be refactored a bit so that
@@ -2468,7 +2385,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
if (matches) {
// All edits for the given row (across all column families) must happen atomically.
if (isPut) {
- put(((Put)w).getFamilyMap(), writeToWAL).checkedGet();
+ put(((Put)w).getFamilyMap(), writeToWAL);
} else {
Delete d = (Delete)w;
prepareDeleteFamilyMap(d);
@@ -2564,7 +2481,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
throws IOException {
Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
familyMap.put(family, edits);
- this.put(familyMap, true).checkedGet();
+ this.put(familyMap, true);
}
/**
@@ -2574,12 +2491,11 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
- private CheckedFuture<Void, IOException> put(final Map<byte [], List<KeyValue>> familyMap,
+ private void put(final Map<byte [], List<KeyValue>> familyMap,
boolean writeToWAL) throws IOException {
- final long now = EnvironmentEdgeManager.currentTimeMillis();
+ long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
- ListenableFuture<Long> wallEditSyncFuture =
- Futures.immediateFuture(new Long(-1));
+ boolean flush = false;
this.updatesLock.readLock().lock();
try {
checkFamilies(familyMap.keySet());
@@ -2590,43 +2506,30 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
// If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
+ long seqNum = -1;
if (!this.disableWAL && writeToWAL) {
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(familyMap, walEdit);
- wallEditSyncFuture = this.log.append(regionInfo,
+ seqNum = this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), walEdit, now);
}
- return Futures.makeChecked(Futures.transform(wallEditSyncFuture,
- new AsyncFunction<Long, Void>() {
- @Override
- public ListenableFuture<Void> apply(Long input) throws Exception {
- boolean flush = false;
- updatesLock.readLock().lock();
- try {
- long addedSize = applyFamilyMapToMemstore(familyMap, input);
- flush = isFlushSize(incMemoryUsage(addedSize));
- } finally {
- updatesLock.readLock().unlock();
- }
- // do after lock
- long after = EnvironmentEdgeManager.currentTimeMillis();
- String signature = SchemaMetrics.generateSchemaMetricsPrefix(
- getTableDesc().getNameAsString(), familyMap.keySet());
- HRegion.incrTimeVaryingMetric(signature + "put_", after - now);
-
- if (flush) {
- // Request a cache flush. Do it outside update lock.
- requestFlush();
- }
- return Futures.immediateFuture(null);
- }
- }), this.getExceptionMapperFunction());
- } catch (Exception e) {
- return Futures.immediateFailedCheckedFuture(new IOException(e));
+ long addedSize = applyFamilyMapToMemstore(familyMap, seqNum);
+ flush = isFlushSize(this.incMemoryUsage(addedSize));
} finally {
this.updatesLock.readLock().unlock();
}
+
+ // do after lock
+ long after = EnvironmentEdgeManager.currentTimeMillis();
+ String signature = SchemaMetrics.generateSchemaMetricsPrefix(
+ this.getTableDesc().getNameAsString(), familyMap.keySet());
+ HRegion.incrTimeVaryingMetric(signature + "put_", after - now);
+
+ if (flush) {
+ // Request a cache flush. Do it outside update lock.
+ requestFlush();
+ }
}
/**
@@ -3885,9 +3788,8 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
// 6. append/sync all edits at once
// TODO: Do batching as in doMiniBatchPut
- long seqNum;
- seqNum = this.log.append(regionInfo, this.getTableDesc().getName(),
- walEdit, now).checkedGet();
+ long seqNum = this.log.append(regionInfo, this.getTableDesc().getName(),
+ walEdit, now);
// 7. apply to memstore
long addedSize = 0;
@@ -3973,7 +3875,7 @@ public class HRegion implements HeapSize, ConfigurationObserver, HRegionIf {
WALEdit walEdit = new WALEdit();
walEdit.add(newKv);
seqNum = this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
- walEdit, now).checkedGet();
+ walEdit, now);
}
// Now request the ICV to the store, this will set the timestamp
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 207760f..a742a29 100755
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -64,8 +64,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.Nullable;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
@@ -176,12 +174,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -2119,13 +2112,6 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
return this.hlogs[i];
}
- /**
- * Return number of logs present in the regionserver
- */
- public int getLogCount() {
- return this.hlogs.length;
- }
-
public int getTotalHLogCnt() {
return this.hlogs.length;
}
@@ -2979,18 +2965,13 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
}
}
- public CheckedFuture<Integer, IOException> putAsync(final byte[] regionName,
- final List<Put> puts) throws IOException {
- return applyMutations(regionName, puts, "multiput_");
- }
-
@Override
- public int put(final byte[] regionName,
- final List<Put> puts) throws IOException {
- return applyMutations(regionName, puts, "multiput_").checkedGet();
+ public int put(final byte[] regionName, final List<Put> puts)
+ throws IOException {
+ return applyMutations(regionName, puts, "multiput_");
}
- private CheckedFuture<Integer, IOException> applyMutations(final byte[] regionName,
+ private int applyMutations(final byte[] regionName,
final List<? extends Mutation> mutations,
String methodName)
throws IOException {
@@ -3011,45 +2992,18 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
opWithLocks[i++] = new Pair<Mutation, Integer>(p, lock);
}
- ListenableFuture<OperationStatusCode[]> batchMutateFuture =
- region.batchMutateWithLocks(opWithLocks,
+ OperationStatusCode[] codes = region.batchMutateWithLocks(opWithLocks,
methodName);
- return Futures.makeChecked(Futures.transform(batchMutateFuture,
- new AsyncFunction<OperationStatusCode[], Integer>() {
-
- @Override
- public ListenableFuture<Integer> apply(OperationStatusCode[] codes)
- throws Exception {
- try {
- for (int i = 0; i < codes.length; i++) {
- if (codes[i] != OperationStatusCode.SUCCESS)
- return Futures.immediateFuture(i);
- }
- return Futures.immediateFuture(HConstants.MULTIPUT_SUCCESS);
- } catch (Throwable t) {
- return Futures.immediateFailedCheckedFuture(
- convertThrowableToIOE(cleanup(t)));
- }
- }
- }), getConvertThrowableToIOEFunction());
+ for (i = 0; i < codes.length; i++) {
+ if (codes[i] != OperationStatusCode.SUCCESS)
+ return i;
+ }
+ return HConstants.MULTIPUT_SUCCESS;
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
}
- private Function<Exception, IOException> getConvertThrowableToIOEFunction() {
- return new Function<Exception, IOException>() {
- @Override
- @Nullable
- public IOException apply(@Nullable Exception e) {
- if (e instanceof ExecutionException) {
- return convertThrowableToIOE(cleanup(e.getCause()));
- }
- return convertThrowableToIOE(cleanup(e));
- }
- };
- }
-
private boolean checkAndMutate(final byte[] regionName, final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Writable w, Integer lock) throws IOException {
@@ -3312,7 +3266,7 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
public int delete(final byte[] regionName,
final List<Delete> deletes)
throws IOException {
- return applyMutations(regionName, deletes, "multidelete_").checkedGet();
+ return applyMutations(regionName, deletes, "multidelete_");
}
@Override
@@ -3961,18 +3915,32 @@ public class HRegionServer implements HRegionServerIf, HBaseRPCErrorHandler,
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
MultiPutResponse resp = new MultiPutResponse();
- Map<byte[], CheckedFuture<Integer, IOException>> futuresMap =
- new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ // do each region as it's own.
+ int size = puts.puts.size();
+ int index = 0;
for( Map.Entry<byte[], List<Put>> e: puts.puts.entrySet()) {
- futuresMap.put(e.getKey(), putAsync(e.getKey(), e.getValue()));
- }
- for (Entry<byte[], CheckedFuture<Integer, IOException>> e :
- futuresMap.entrySet()) {
- CheckedFuture<Integer, IOException> f = e.getValue();
- byte[] row = e.getKey();
- int ret = f.checkedGet();
- resp.addResult(row, ret);
+ int result = put(e.getKey(), e.getValue());
+ resp.addResult(e.getKey(), result);
+
+ index++;
+ if (index < size) {
+ // remove the reference to the region list of Puts to save RAM except
+ // for the last one. We will lose the reference to the last one pretty
+ // soon anyway; keep it for a little more, until we get back
+ // to HBaseServer level, where we might need to pretty print
+ // the MultiPut request for debugging slow/large puts.
+ // Note: A single row "put" from client also end up in server
+ // as a multiPut().
+ // We set the value to an empty array so taskmonitor can either get
+ // the old value or an empty array. If we call clear() on the array,
+ // then we might have a race condition where we're iterating over the
+ // array at the same time we clear it (which throws an exception)
+ // This relies on the fact that Map.Entry.setValue() boils down to a
+ // simple reference assignment, which is atomic
+ e.setValue(emptyPutArray); // clear some RAM
+ }
}
+
return resp;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
index 53e9757..6b28f03 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java
@@ -24,6 +24,9 @@ import java.util.LinkedList;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
/**
* Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index e7aa701..0cfaa8f 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -63,8 +63,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -82,10 +80,13 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionSeqidTransition;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -98,13 +99,7 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -151,9 +146,7 @@ public class HLog implements Syncable {
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
public static final byte [] METAROW = Bytes.toBytes("METAROW");
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
- public static SimpleDateFormat getDateFormat() {
- return new SimpleDateFormat("yyyy-MM-dd-HH");
- }
+ public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd-HH");
/*
* Name of directory that holds recovered edits written by the wal log
@@ -188,8 +181,8 @@ public class HLog implements Syncable {
Collections.synchronizedList(new ArrayList<LogActionsListener>());
- volatile static Class<? extends Writer> logWriterClass;
- volatile static Class<? extends Reader> logReaderClass;
+ static Class<? extends Writer> logWriterClass;
+ static Class<? extends Reader> logReaderClass;
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
private int initialReplication; // initial replication factor of SequenceFile.writer
@@ -291,6 +284,9 @@ public class HLog implements Syncable {
private final List<LogEntryVisitor> logEntryVisitors =
new CopyOnWriteArrayList<LogEntryVisitor>();
+ private volatile long lastLogRollStartTimeMillis = 0;
+ private volatile long lastLogRollDurationMillis = 0;
+
/**
* Pattern used to validate a HLog file name
*/
@@ -309,9 +305,9 @@ public class HLog implements Syncable {
synchronized (this) {
min = Math.min(min, val);
max = Math.max(max, val);
- total += val;
- ++count;
}
+ total += val;
+ ++count;
}
Metric get() {
@@ -363,17 +359,14 @@ public class HLog implements Syncable {
*/
private class DoubleListBuffer {
private LinkedList<Entry> currentList = new LinkedList<Entry>();
- private SettableFuture<Void> currentFuture = SettableFuture.create();;
private LinkedList<Entry> syncList = new LinkedList<Entry>();
/**
* Append a log entry into the buffer
* @param entry log entry
*/
- synchronized private ListenableFuture<Void> appendToBuffer(Entry entry) {
- Preconditions.checkNotNull(this.currentFuture);
+ synchronized private void appendToBuffer(Entry entry) {
currentList.add(entry);
- return this.currentFuture;
}
/**
@@ -382,8 +375,6 @@ public class HLog implements Syncable {
* @return number of log entries synced
*/
private int appendAndSync() throws IOException {
- // The future returned to the append operation.
- SettableFuture<Void> future;
synchronized (this) {
if (currentList.isEmpty()) { // no thing to sync
return 0;
@@ -394,49 +385,26 @@ public class HLog implements Syncable {
LinkedList<Entry> tmp = syncList;
syncList = currentList;
currentList = tmp;
- future = currentFuture;
- this.currentFuture = SettableFuture.create();
}
- try {
- // append entries to writer
- int syncedEntries = syncList.size();
- while (!syncList.isEmpty()) {
- Entry entry = syncList.remove();
- writer.append(entry);
- }
-
- // sync the data
- long now = System.currentTimeMillis();
- writer.sync();
- syncTime.inc(System.currentTimeMillis() - now);
- future.set(null);
- return syncedEntries;
- } catch (Exception e) {
- future.setException(e);
- return 0;
+ // append entries to writer
+ int syncedEntries = syncList.size();
+ while (!syncList.isEmpty()) {
+ Entry entry = syncList.remove();
+ writer.append(entry);
}
+
+ // sync the data
+ long now = System.currentTimeMillis();
+ writer.sync();
+ syncTime.inc(System.currentTimeMillis() - now);
+ return syncedEntries;
}
}
private DoubleListBuffer logBuffer = new DoubleListBuffer();
/**
- * Minimal constructor to get going in tests
- */
- public HLog(final Configuration conf, Path basePath) throws IOException {
- this(FileSystem.get(conf), new Path(basePath, ".logs"),
- new Path(basePath, ".oldlogs"), conf, null);
- }
-
- /**
- * Minimal constructor to get going in tests
- */
- public HLog(final Configuration conf) throws IOException {
- this(conf, new Path("/tmp"));
- }
-
- /**
* HLog creating with a null actions listener.
*
* @param fs filesystem handle
@@ -683,6 +651,8 @@ public class HLog implements Syncable {
this.numEntries.set(0);
t1 = EnvironmentEdgeManager.currentTimeMillis();
+ lastLogRollStartTimeMillis = t0;
+ lastLogRollDurationMillis = (t1 - t0);
}
Path oldFile = null;
@@ -729,7 +699,7 @@ public class HLog implements Syncable {
throws IOException {
try {
if (logReaderClass == null) {
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
+ logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
@@ -738,7 +708,8 @@ public class HLog implements Syncable {
return reader;
} catch (IOException e) {
throw e;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new IOException("Cannot get log reader", e);
}
}
@@ -1081,10 +1052,10 @@ public class HLog implements Syncable {
* of the memstore.
* @throws IOException
*/
- public CheckedFuture<Long, IOException> append(HRegionInfo info, byte[] tableName, WALEdit edits,
+ public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
final long now) throws IOException {
if (!this.enabled || edits.isEmpty()) {
- return Futures.immediateCheckedFuture(-1L);
+ return -1L;
}
if (logSyncerThread.syncerShuttingDown) {
// can't acquire lock for the duration of append()
@@ -1092,13 +1063,13 @@ public class HLog implements Syncable {
throw new IOException("Cannot append; logSyncer shutting down");
}
- final long len = edits.getTotalKeyValueLength();
- final long txid;
- final long seqNum;
+ long len = edits.getTotalKeyValueLength();
+ long txid = 0;
+ long seqNum;
+ long start = System.currentTimeMillis();
byte[] regionName = info.getRegionName();
- ListenableFuture<Void> logSyncFuture;
synchronized (this.appendLock) {
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
@@ -1109,54 +1080,50 @@ public class HLog implements Syncable {
this.firstSeqWrittenInCurrentMemstore.putIfAbsent(regionName, seqNum);
HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
- logSyncFuture = doWrite(info, logKey, edits);
+ doWrite(info, logKey, edits);
// Only count 1 row as an unflushed entry.
txid = this.unflushedEntries.incrementAndGet();
}
- if (info.isMetaRegion()) forceSync();
- final long start = System.nanoTime();
+ // Update the metrics
+ this.numEntries.incrementAndGet();
+ writeSize.inc(len);
- return Futures.makeChecked(Futures.transform(logSyncFuture,
- new AsyncFunction<Void, Long>() {
- @Override
- public ListenableFuture<Long> apply(Void input) throws Exception {
- Preconditions.checkArgument(txid >= getCurSyncPoint());
- // Update the metrics
- numEntries.incrementAndGet();
- writeSize.inc(len);
-
- // Update the metrics and log down the outliers
- long end = System.nanoTime();
- long syncTime = end - start;
- gsyncTime.inc(syncTime);
-
- if (syncTime > SECOND_IN_NS) {
- LOG.warn(String.format(
- "%s took %d ns appending an edit to hlog; editcount=%d, len~=%s",
- Thread.currentThread().getName(), syncTime, numEntries.get(),
- StringUtils.humanReadableInt(len)));
- slowSyncs.incrementAndGet();
- }
- if (slowSyncs.get() >= slowBeforeRoll) {
- requestLogRoll();
- slowSyncs.set(0);
- }
- return Futures.immediateFuture(new Long(seqNum));
- }
- }), this.getExceptionMapperFunction());
- }
+ // sync txn to file system
+ start = System.nanoTime();
+ this.sync(info.isMetaRegion(), txid);
- public Function<Exception, IOException> getExceptionMapperFunction() {
- return new Function<Exception, IOException>() {
- @Override
- @Nullable
- public IOException apply(@Nullable Exception e) {
- if (e instanceof ExecutionException) {
- return new IOException(e.getCause());
- }
- return new IOException(e);
+ // Update the metrics and log down the outliers
+ long end = System.nanoTime();
+ long syncTime = end - start;
+ gsyncTime.inc(syncTime);
+ if (syncTime > SECOND_IN_NS) {
+ LOG.warn(String.format(
+ "%s took %d ns appending an edit to hlog; editcount=%d, len~=%s",
+ Thread.currentThread().getName(), syncTime, this.numEntries.get(),
+ StringUtils.humanReadableInt(len)));
+ slowSyncs.incrementAndGet();
+ }
+
+ if (slowSyncs.get() >= slowBeforeRoll) {
+ this.requestLogRoll();
+ this.slowSyncs.set(0);
+ }
+
+ // Update the per-request profiling data
+ Call call = HRegionServer.callContext.get();
+ ProfilingData pData = call == null ? null : call.getProfilingData();
+ if (pData != null) {
+ if (this.lastLogRollStartTimeMillis > start
+ && end > this.lastLogRollStartTimeMillis) {
+ // We also had a log roll in between
+ pData.addLong(ProfilingData.HLOG_ROLL_TIME_MS, this.lastLogRollDurationMillis);
+ // Do not account for this as the sync time.
+ syncTime = syncTime - this.lastLogRollDurationMillis;
}
- };
+ // update sync time
+ pData.addLong(ProfilingData.HLOG_SYNC_TIME_MS, syncTime);
+ }
+ return seqNum;
}
/**
@@ -1199,7 +1166,7 @@ public class HLog implements Syncable {
forceSync = false;
// wake up every 100ms to check if logsyncer has to shut down
queueEmpty.awaitNanos(100*1000000);
- if (unflushedEntries.get() >= syncTillHere) {
+ if (unflushedEntries.get() == syncTillHere) {
// call hflush() if we haven't flushed for a while now
// This force-sync is just a safety mechanism - we being
// paranoid. If there hasn't been any syncing activity for
@@ -1242,18 +1209,6 @@ public class HLog implements Syncable {
}
/**
- * This method needs to be called to indicate that we need to force the
- * log syncer thread to sync.
- * @param force
- */
- public void setForceSync(boolean force) {
- forceSync = false;
- }
-
- public long getCurSyncPoint() {
- return syncTillHere;
- }
- /**
* This method first signals the thread that there's a sync needed
* and then waits for it to happen before returning.
* @throws IOException
@@ -1320,14 +1275,6 @@ public class HLog implements Syncable {
logSyncerThread.addToSyncQueue(force, txid);
}
- public void forceSync() {
- logSyncerThread.setForceSync(true);
- }
-
- public long getCurSyncPoint() {
- return logSyncerThread.getCurSyncPoint();
- }
-
private void hflush() {
synchronized (this.updateLock) {
if (this.closed) {
@@ -1431,9 +1378,9 @@ public class HLog implements Syncable {
}
}
- protected ListenableFuture<Void> doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
+ protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
throws IOException {
- return this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
+ this.logBuffer.appendToBuffer(new Entry(logKey, logEdit));
}
/** @return How many items have been added to the log */
@@ -1942,8 +1889,7 @@ public class HLog implements Syncable {
String subDirectoryName;
if (ARCHIVE_TO_HOURLY_DIR) {
// Group into hourly sub-directory
- subDirectoryName =
- getDateFormat().format(Calendar.getInstance().getTime());
+ subDirectoryName = DATE_FORMAT.format(Calendar.getInstance().getTime());
} else {
// since the filename is a valid name, we know there
// is a last '.' (won't return -1)
@@ -2441,25 +2387,11 @@ public class HLog implements Syncable {
walEdit.add(kvLast);
walEdit.add(kvNext);
walEdit.add(kvRegionInfo);
- long newSeqid;
- try {
- newSeqid = append(regionInfo, regionInfo.getTableDesc()
- .getName(), walEdit, now).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
- }
+ long newSeqid = append(regionInfo, regionInfo.getTableDesc().getName(), walEdit, now);
LOG.info("Region " + regionInfo.getRegionNameAsString() +
" sequence id transition " +
seqidTransition.getLastSeqid() + "->" +
seqidTransition.getNextSeqid() + " appended to HLog at seqid=" + newSeqid);
return true;
}
-
- /**
- * Returns the underlying configuration object.
- * @return
- */
- public Configuration getConfiguration() {
- return this.conf;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index b161f51..1b62945 100644
--- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -2093,8 +2093,4 @@ public class HBaseTestingUtility {
getHBaseAdmin().moveRegion(regionInfo.getRegionName(),
regionServer.toString());
}
-
- public HConnection getConnection() {
- return HConnectionManager.getConnection(this.conf);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMultiputs.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMultiputs.java b/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMultiputs.java
deleted file mode 100644
index 3b0ef2b..0000000
--- a/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMultiputs.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.StringBytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestAsyncMultiputs {
- private static final Log LOG = LogFactory.getLog(TestAsyncMultiputs.class);
- private static final HBaseTestingUtility util = new HBaseTestingUtility();
- private static int NUM_REGIONSERVERS = 1;
-
- @BeforeClass
- public static void setUp() throws Exception {
- util.startMiniCluster(NUM_REGIONSERVERS);
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- util.shutdownMiniCluster();
- }
-
- /**
- * Test intended to check if the wal gives the control
- * back when writers are calling append.
- * @throws IOException
- * @throws InterruptedException
- */
- @Test
- public void testMultithreadedCallbackFromHLog() throws IOException, InterruptedException {
- final HLog log = new HLog(util.getConfiguration(),
- new Path("/tmp/testMultithreadedCallback"));
- int numThreads = 100;
- ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < numThreads; i++) {
- final int threadId = i;
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- HTableDescriptor desc =
- new HTableDescriptor(Bytes.toBytes("table" + threadId));
- HRegionInfo info = new HRegionInfo(desc, new byte[0], new byte[0]);
- WALEdit edits = new WALEdit();
- for (byte[] row : getRandomRows(1, 10)) {
- edits.add(new KeyValue(row));
- }
- try {
- log.append(info, desc.getName(), edits,
- System.currentTimeMillis()).get(1, TimeUnit.MINUTES);
- } catch(Exception e) {
- fail();
- }
- return null;
- }
- });
- }
- executor.shutdown();
- executor.awaitTermination(1, TimeUnit.MINUTES);
- }
-
- @Test
- public void testMultiputSucceeds() throws IOException {
- String tableName = "testMultiputSucceeds";
- byte[] tableNameBytes = Bytes.toBytes(tableName);
- String cf = "cf";
- byte[][] families = new byte[][] { Bytes.toBytes(cf) };
- LOG.debug("Creating the table " + tableName);
- int numRegions = 100;
- HRegionServer server = createTableAndGiveMeTheRegionServer(tableNameBytes,
- families, numRegions);
- MultiPut multiput = getRandomMultiPut(tableNameBytes, 1000, families[0],
- server.getServerInfo().getServerAddress());
- server.multiPut(multiput);
- for (HRegion region : server.getOnlineRegionsAsArray()) {
- server.flushRegion(region.getRegionName());
- }
- for (Entry<byte[], List<Put>> entry : multiput.getPuts().entrySet()) {
- byte[] regionName = entry.getKey();
- for (Put p : entry.getValue()) {
- Get g = new Get(p.getRow());
- Result r = server.get(regionName, g);
- assertNotNull(g);
- assertTrue(Bytes.equals(r.getValue(families[0], null), p.getRow()));
- assertTrue(Bytes.equals(r.getRow(), p.getRow()));
- }
- }
- }
-
- public HRegionServer createTableAndGiveMeTheRegionServer(
- byte[] tableName, byte[][] families, int numregions) throws IOException {
- LOG.debug("Creating table " + Bytes.toString(tableName) + " with "
- + numregions + " regions");
- util.createTable(tableName, families, 1, Bytes.toBytes("aaa"),
- Bytes.toBytes("zzz"), numregions);
- assertTrue(NUM_REGIONSERVERS == 1);
- HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
- LOG.debug("Found the regionserver. Returning" + rs);
- return rs;
- }
-
- public static MultiPut getRandomMultiPut(byte[] tableName, int numPuts, byte[] cf,
- HServerAddress addr) throws IOException {
- return getRandomMultiPut(tableName, numPuts, cf, addr, util);
- }
-
- /**
- * Creates random puts with rows between aaa.. to zzz..
- * @param tableName
- * @param numPuts
- * @param cf
- * @param addr
- * @param util : The hbase testing utility to use.
- * @return
- * @throws IOException
- */
- public static MultiPut getRandomMultiPut(byte[] tableName, int numPuts, byte[] cf,
- HServerAddress addr, HBaseTestingUtility util) throws IOException {
- MultiPut multiput = new MultiPut(addr);
- List<Put> puts = getRandomPuts(numPuts, cf);
- LOG.debug("Creating " + numPuts + " puts to insert into the multiput");
- HConnection conn = util.getConnection();
- for (Put put : puts) {
- byte[] regionName = conn.getRegionLocation(new StringBytes(tableName),
- put.getRow(), false).getRegionInfo().getRegionName();
- multiput.add(regionName, put);
- }
- return multiput;
- }
-
- /**
- * Creates puts with rows between aaa... and zzz..
- * @param numPuts
- * @param cf
- * @return
- */
- public static List<Put> getRandomPuts(int numPuts, byte[] cf) {
- List<Put> ret = new ArrayList<Put> (numPuts);
- List<byte[]> rows = getRandomRows(numPuts, 3);
- for (byte[] row : rows) {
- Put p = new Put(row);
- p.add(cf, null, row);
- ret.add(p);
- }
- return ret;
- }
-
- public static List<byte[]> getRandomRows(int numRows, int rowlen) {
- Random rand = new Random();
- List<byte[]> ret = new ArrayList<byte[]>(numRows);
- for (int i = 0; i < numRows; i++) {
- byte[] row = new byte[rowlen];
- for (int j = 0; j < rowlen; j++) {
- row[j] = (byte) ('a' + (byte) rand.nextInt('z' - 'a'));
- }
- ret.add(row);
- }
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 3f0c5bb..f22f256 100644
--- a/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -50,9 +50,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* This class is for testing HCM features
*/
@@ -278,11 +275,9 @@ public class TestHCM {
}
@Override
- public ListenableFuture<OperationStatusCode[]> batchMutateWithLocks(
- Pair<Mutation, Integer>[] putsAndLocks,
+ public OperationStatusCode[] batchMutateWithLocks(Pair<Mutation, Integer>[] putsAndLocks,
String methodName) throws IOException {
- return Futures.immediateFailedFuture(
- new IOException("Test induced failure"));
+ throw new IOException("Test induced failure");
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/master/TestOldLogsCleaner.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestOldLogsCleaner.java b/src/test/java/org/apache/hadoop/hbase/master/TestOldLogsCleaner.java
index 758ae4e..42b7880 100644
--- a/src/test/java/org/apache/hadoop/hbase/master/TestOldLogsCleaner.java
+++ b/src/test/java/org/apache/hadoop/hbase/master/TestOldLogsCleaner.java
@@ -144,11 +144,10 @@ public class TestOldLogsCleaner {
fs.mkdirs(legacyDir);
fs.createNewFile(new Path(legacyDir, "123.456"));
Calendar cal = Calendar.getInstance();
- System.out.println("Now is: " + HLog.getDateFormat().format(cal.getTime()));
+ System.out.println("Now is: " + HLog.DATE_FORMAT.format(cal.getTime()));
for (int i = 0; i < 10; i++) {
cal.add(Calendar.HOUR, -1);
- Path hourDir = new Path(oldLogDir,
- HLog.getDateFormat().format(cal.getTime()));
+ Path hourDir = new Path(oldLogDir, HLog.DATE_FORMAT.format(cal.getTime()));
fs.mkdirs(hourDir);
fs.createNewFile(new Path(hourDir, new Path(fakeMachineName + "." + i)));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index cb4a9ec..437c19c 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -372,7 +372,7 @@ public class TestHRegion extends HBaseTestCase {
puts[i].add(cf, qual, val);
}
- OperationStatusCode[] codes = this.region.put(puts).get();
+ OperationStatusCode[] codes = this.region.put(puts);
assertEquals(10, codes.length);
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i]);
@@ -382,7 +382,7 @@ public class TestHRegion extends HBaseTestCase {
LOG.info("Next a batch put with one invalid family");
puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
try {
- codes = this.region.put(puts).get();
+ codes = this.region.put(puts);
// put should throw an exception
assertTrue(false);
@@ -403,7 +403,7 @@ public class TestHRegion extends HBaseTestCase {
TestThread putter = new TestThread(ctx) {
@Override
public void doWork() throws IOException {
- retFromThread.set(region.put(puts).checkedGet());
+ retFromThread.set(region.put(puts));
}
};
LOG.info("...starting put thread while holding lock");
@@ -439,8 +439,7 @@ public class TestHRegion extends HBaseTestCase {
putsAndLocks.add(pair);
}
- codes = region.batchMutateWithLocks(putsAndLocks.toArray(
- new Pair[0]), "multiput_").get();
+ codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_");
LOG.info("...performed put");
for (int i = 0; i < 10; i++) {
assertEquals(OperationStatusCode.SUCCESS, codes[i]);
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncMultiputsFailingWal.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncMultiputsFailingWal.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncMultiputsFailingWal.java
deleted file mode 100644
index 20150c0..0000000
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncMultiputsFailingWal.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import static org.junit.Assert.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.MultiPut;
-import org.apache.hadoop.hbase.client.TestAsyncMultiputs;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.TestHLog.FailingSequenceFileLogWriter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-public class TestAsyncMultiputsFailingWal {
- private final Log LOG = LogFactory.getLog(TestAsyncMultiputsFailingWal.class);
-
- @Test
- public void testFailedMultiput() throws Exception {
- String tableName = "testFailedMultiput";
- byte[] tableNameBytes = Bytes.toBytes(tableName);
- String cf = "cf";
- byte[][] families = new byte[][] { Bytes.toBytes(cf) };
- LOG.debug("Creating the table " + tableName);
- int numRegions = 3;
- HBaseTestingUtility localutil = new HBaseTestingUtility();
- localutil.startMiniCluster();
-
- LOG.debug("Creating table " + tableName + " with "
- + numRegions + " regions");
- localutil.createTable(tableNameBytes, families, 1, Bytes.toBytes("aaa"),
- Bytes.toBytes("zzz"), numRegions);
- HRegionServer rs = localutil.getRSForFirstRegionInTable(tableNameBytes);
- LOG.debug("Found the regionserver. Returning" + rs);
-
- HLog.logWriterClass = null;
- assertTrue(rs.getLogCount() == 1);
- HLog log = rs.getLog(0);
- log.getConfiguration().set("hbase.regionserver.hlog.writer.impl",
- FailingSequenceFileLogWriter.class.getName());
- log.rollWriter();
- MultiPut multiput = TestAsyncMultiputs.getRandomMultiPut(
- tableNameBytes, 1000, families[0],
- rs.getServerInfo().getServerAddress(), localutil);
-
- try {
- FailingSequenceFileLogWriter.setFailureMode(true);
- rs.multiPut(multiput);
- FailingSequenceFileLogWriter.setFailureMode(false);
- } catch (Exception e) {
- LOG.debug("Excepted to see an exception", e);
- return;
- }
- fail();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 59e666a..b7d644e 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ExecutionException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -139,12 +139,9 @@ public class TestHLog {
* Just write multiple logs then split. Before fix for HADOOP-2283, this
* would fail.
* @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
*/
@Test
- public void testSplit() throws IOException,
- InterruptedException, ExecutionException {
+ public void testSplit() throws IOException {
final byte [] tableName = Bytes.toBytes(TestHLog.class.getSimpleName());
final byte [] rowName = tableName;
@@ -170,7 +167,7 @@ public class TestHLog {
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
log.append(infos[i], tableName, edit,
- System.currentTimeMillis()).get();
+ System.currentTimeMillis());
}
}
log.rollWriter();
@@ -219,7 +216,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
- wal.append(info, bytes, kvs, System.currentTimeMillis()).get();
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
// Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE.
@@ -237,7 +234,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
- wal.append(info, bytes, kvs, System.currentTimeMillis()).get();
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
reader = HLog.getReader(fs, walPath, conf);
count = 0;
@@ -256,7 +253,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
- wal.append(info, bytes, kvs, System.currentTimeMillis()).get();
+ wal.append(info, bytes, kvs, System.currentTimeMillis());
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
@@ -353,7 +350,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
- wal.append(regioninfo, tableName, kvs, System.currentTimeMillis()).get();
+ wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
}
// Now call sync to send the data to HDFS datanodes
wal.sync(true);
@@ -451,12 +448,9 @@ public class TestHLog {
/**
* Tests that we can write out an edit, close, and then read it back in again.
* @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
*/
@Test
- public void testEditAdd() throws IOException,
- InterruptedException, ExecutionException {
+ public void testEditAdd() throws IOException {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
@@ -475,7 +469,7 @@ public class TestHLog {
HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
final byte [] regionName = info.getRegionName();
- log.append(info, tableName, cols, System.currentTimeMillis()).get();
+ log.append(info, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush(info.getRegionName());
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
log.close();
@@ -547,12 +541,9 @@ public class TestHLog {
/**
* @throws IOException
- * @throws ExecutionException
- * @throws InterruptedException
*/
@Test
- public void testAppend() throws IOException,
- InterruptedException, ExecutionException {
+ public void testAppend() throws IOException {
final int COL_COUNT = 10;
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
@@ -570,7 +561,7 @@ public class TestHLog {
}
HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- log.append(hri, tableName, cols, System.currentTimeMillis()).get();
+ log.append(hri, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush(hri.getRegionName());
log.completeCacheFlush(hri.getRegionName(), tableName, logSeqId, false);
log.close();
@@ -650,13 +641,13 @@ public class TestHLog {
}
private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
- int times) throws IOException, InterruptedException, ExecutionException {
+ int times) throws IOException {
final byte [] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
- log.append(hri, tableName, cols, timestamp).checkedGet();
+ log.append(hri, tableName, cols, timestamp);
}
}
@@ -673,23 +664,6 @@ public class TestHLog {
}
}
- public static class FailingSequenceFileLogWriter
- extends SequenceFileLogWriter implements HLog.Writer {
- static boolean enabled = false;
- public synchronized static void setFailureMode(boolean enabled) {
- FailingSequenceFileLogWriter.enabled = enabled;
- }
-
- @Override
- public void sync() throws IOException {
- if (FailingSequenceFileLogWriter.enabled) {
- throw new IOException(
- "Failing here to see if the error propogates upstream");
- }
- super.sync();
- }
- }
-
static class DumbLogEntriesVisitor implements LogEntryVisitor {
int increments = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index 8d7b3b1..6a5e4a2 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -52,7 +52,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -541,8 +540,7 @@ public class TestHLogSplit {
* safely commence on the master.
* */
@Test
- public void testLogRollAfterSplitStart() throws IOException,
- InterruptedException, ExecutionException {
+ public void testLogRollAfterSplitStart() throws IOException {
// set flush interval to a large number so it doesn't interrupt us
final String F_INTERVAL = "hbase.regionserver.optionallogflushinterval";
long oldFlushInterval = conf.getLong(F_INTERVAL, 1000);
@@ -563,7 +561,7 @@ public class TestHLogSplit {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
- log.append(regioninfo, tableName, kvs, System.currentTimeMillis()).get();
+ log.append(regioninfo, tableName, kvs, System.currentTimeMillis());
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync(true);
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
index 2297b40..d1f6890 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
@@ -97,7 +97,7 @@ public class TestLogActionsListener {
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
edit.add(kv);
- hlog.append(hri, b, edit, 0).get();
+ hlog.append(hri, b, edit, 0);
if (i == 10) {
hlog.addLogActionsListerner(laterList);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/da748d43/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index b8571c4..62e954f 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -433,14 +432,14 @@ public class TestWALReplay {
long now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
- wal.append(hri, tableName, edit, now).get();
+ wal.append(hri, tableName, edit, now);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
KeyValue.Type.DeleteFamily));
- wal.append(hri, tableName, edit, now).get();
+ wal.append(hri, tableName, edit, now);
// Sync.
wal.sync();
@@ -580,7 +579,7 @@ public class TestWALReplay {
private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HLog wal)
- throws IOException, InterruptedException, ExecutionException {
+ throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
@@ -588,7 +587,7 @@ public class TestWALReplay {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTimeMillis(), columnBytes));
- wal.append(hri, tableName, edit, ee.currentTimeMillis()).checkedGet();
+ wal.append(hri, tableName, edit, ee.currentTimeMillis());
}
}