You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2014/09/26 04:22:21 UTC
git commit: HBASE-12086 Fix bug of HTableMultipliexer
Repository: hbase
Updated Branches:
refs/heads/master 31ed81744 -> 78d532e5f
HBASE-12086 Fix bug of HTableMultipliexer
Signed-off-by: Elliott Clark <ec...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/78d532e5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/78d532e5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/78d532e5
Branch: refs/heads/master
Commit: 78d532e5f344edda04fb9ce44bef9cd79e0d1935
Parents: 31ed817
Author: David Deng <da...@gmail.com>
Authored: Tue Sep 23 22:46:03 2014 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Sep 25 11:30:29 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 27 ++-
.../org/apache/hadoop/hbase/client/HTable.java | 17 +-
.../hadoop/hbase/client/HTableMultiplexer.java | 207 +++++++++----------
.../hbase/client/TestHTableMultiplexer.java | 65 +++---
4 files changed, 171 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/78d532e5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index f8cdb57..2dbe263 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.htrace.Trace;
+
import com.google.common.annotations.VisibleForTesting;
/**
@@ -288,6 +288,10 @@ class AsyncProcess {
this.rpcFactory = rpcFactory;
}
+ /**
+ * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
+ * RuntimeException
+ */
private ExecutorService getPool(ExecutorService pool) {
if (pool != null) return pool;
if (this.pool != null) return this.pool;
@@ -352,8 +356,8 @@ class AsyncProcess {
RegionLocations locs = hConnection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
- throw new IOException("#" + id + ", no location found, aborting submit for" +
- " tableName=" + tableName + " rowkey=" + Arrays.toString(r.getRow()));
+ throw new IOException("#" + id + ", no location found, aborting submit for"
+ + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
}
loc = locs.getDefaultRegionLocation();
} catch (IOException ex) {
@@ -383,15 +387,24 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
+ return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
+ locationErrors, locationErrorRows, actionsByServer, pool);
+ }
+
+ <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
+ List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
+ Object[] results, boolean needResults, List<Exception> locationErrors,
+ List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
+ ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, retainedActions, nonceGroup, pool, callback, null, needResults);
+ tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
- Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
+ Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
@@ -406,7 +419,7 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
*/
- private void addAction(ServerName server, byte[] regionName, Action<Row> action,
+ private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
MultiAction<Row> multiAction = actionsByServer.get(server);
if (multiAction == null) {
@@ -531,7 +544,7 @@ class AsyncProcess {
return ars;
}
- private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
+ private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/78d532e5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 26da937..8a6575e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -319,6 +319,13 @@ public class HTable implements HTableInterface, RegionLocator {
}
/**
+ * @return maxKeyValueSize from configuration.
+ */
+ public static int getMaxKeyValueSize(Configuration conf) {
+ return conf.getInt("hbase.client.keyvalue.maxsize", -1);
+ }
+
+ /**
* setup this HTable's parameter based on the passed configuration
*/
private void finishSetup() throws IOException {
@@ -348,8 +355,7 @@ public class HTable implements HTableInterface, RegionLocator {
ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
multiAp = this.connection.getAsyncProcess();
- this.maxKeyValueSize = this.configuration.getInt(
- "hbase.client.keyvalue.maxsize", -1);
+ this.maxKeyValueSize = getMaxKeyValueSize(this.configuration);
this.closed = false;
}
@@ -1470,7 +1476,12 @@ public class HTable implements HTableInterface, RegionLocator {
}
// validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException{
+ public void validatePut(final Put put) throws IllegalArgumentException {
+ validatePut(put, maxKeyValueSize);
+ }
+
+ // validate for well-formedness
+ public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/78d532e5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 9f5e836..e8c6909 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -22,13 +22,12 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -41,8 +40,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@@ -67,35 +67,35 @@ public class HTableMultiplexer {
static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
- private Map<TableName, HTable> tableNameToHTableMap;
-
/** The map between each region server to its corresponding buffer queue */
- private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>>
- serverToBufferQueueMap;
+ private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap =
+ new ConcurrentHashMap<>();
/** The map between each region server to its flush worker */
- private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
+ private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
+ new ConcurrentHashMap<>();
- private Configuration conf;
- private int retryNum;
+ private final Configuration conf;
+ private final ClusterConnection conn;
+ private final ExecutorService pool;
+ private final int retryNum;
private int perRegionServerBufferQueueSize;
+ private final int maxKeyValueSize;
/**
- *
* @param conf The HBaseConfiguration
- * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops
- * for each region server before dropping the request.
+ * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops for
+ * each region server before dropping the request.
*/
- public HTableMultiplexer(Configuration conf,
- int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
+ public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize)
+ throws IOException {
this.conf = conf;
- this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation,
- LinkedBlockingQueue<PutStatus>>();
- this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
- this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
+ this.conn = (ClusterConnection) ConnectionFactory.createConnection(conf);
+ this.pool = HTable.getDefaultExecutor(conf);
this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
+ this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
}
/**
@@ -110,10 +110,6 @@ public class HTableMultiplexer {
return put(tableName, put, this.retryNum);
}
- public boolean put(byte[] tableName, final Put put) throws IOException {
- return put(TableName.valueOf(tableName), put);
- }
-
/**
* The puts request will be buffered by their corresponding buffer queue.
* Return the list of puts which could not be queued.
@@ -165,15 +161,14 @@ public class HTableMultiplexer {
return false;
}
- LinkedBlockingQueue<PutStatus> queue;
- HTable htable = getHTable(tableName);
try {
- htable.validatePut(put);
- HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
+ HTable.validatePut(put, maxKeyValueSize);
+ HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
if (loc != null) {
// Add the put pair into its corresponding queue.
- queue = addNewRegionServer(loc, htable);
- // Generate a MultiPutStatus obj and offer it into the queue
+
+ LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
+ // Generate a MultiPutStatus object and offer it into the queue
PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
return queue.offer(s);
@@ -196,43 +191,30 @@ public class HTableMultiplexer {
return new HTableMultiplexerStatus(serverToFlushWorkerMap);
}
-
- private HTable getHTable(TableName tableName) throws IOException {
- HTable htable = this.tableNameToHTableMap.get(tableName);
- if (htable == null) {
- synchronized (this.tableNameToHTableMap) {
- htable = this.tableNameToHTableMap.get(tableName);
- if (htable == null) {
- htable = new HTable(conf, tableName);
- this.tableNameToHTableMap.put(tableName, htable);
+ private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
+ LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
+ if (queue == null) {
+ synchronized (this.serverToBufferQueueMap) {
+ queue = serverToBufferQueueMap.get(addr);
+ if (queue == null) {
+ // Create a queue for the new region server
+ queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
+ serverToBufferQueueMap.put(addr, queue);
+
+ // Create the flush worker
+ HTableFlushWorker worker =
+ new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
+ this.serverToFlushWorkerMap.put(addr, worker);
+
+ // Launch a daemon thread to flush the puts
+ // from the queue to its corresponding region server.
+ String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
+ Thread t = new Thread(worker, name);
+ t.setDaemon(true);
+ t.start();
}
}
}
- return htable;
- }
-
- private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
- HRegionLocation addr, HTable htable) {
- LinkedBlockingQueue<PutStatus> queue =
- serverToBufferQueueMap.get(addr);
- if (queue == null) {
- // Create a queue for the new region server
- queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
- serverToBufferQueueMap.put(addr, queue);
-
- // Create the flush worker
- HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
- this, queue, htable);
- this.serverToFlushWorkerMap.put(addr, worker);
-
- // Launch a daemon thread to flush the puts
- // from the queue to its corresponding region server.
- String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
- + (poolID++);
- Thread t = new Thread(worker, name);
- t.setDaemon(true);
- t.start();
- }
return queue;
}
@@ -405,28 +387,25 @@ public class HTableMultiplexer {
}
private static class HTableFlushWorker implements Runnable {
- private HRegionLocation addr;
- private Configuration conf;
- private LinkedBlockingQueue<PutStatus> queue;
- private HTableMultiplexer htableMultiplexer;
- private AtomicLong totalFailedPutCount;
- private AtomicInteger currentProcessingPutCount;
- private AtomicAverageCounter averageLatency;
- private AtomicLong maxLatency;
- private HTable htable; // For Multi
+ private final HRegionLocation addr;
+ private final Configuration conf;
+ private final ClusterConnection conn;
+ private final LinkedBlockingQueue<PutStatus> queue;
+ private final HTableMultiplexer htableMultiplexer;
+ private final AtomicLong totalFailedPutCount = new AtomicLong(0);
+ private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
+ private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
+ private final AtomicLong maxLatency = new AtomicLong(0);
+ private final ExecutorService pool;
- public HTableFlushWorker(Configuration conf, HRegionLocation addr,
- HTableMultiplexer htableMultiplexer,
- LinkedBlockingQueue<PutStatus> queue, HTable htable) {
+ public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
+ HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue, ExecutorService pool) {
this.addr = addr;
this.conf = conf;
+ this.conn = conn;
this.htableMultiplexer = htableMultiplexer;
this.queue = queue;
- this.totalFailedPutCount = new AtomicLong(0);
- this.currentProcessingPutCount = new AtomicInteger(0);
- this.averageLatency = new AtomicAverageCounter();
- this.maxLatency = new AtomicLong(0);
- this.htable = htable;
+ this.pool = pool;
}
public long getTotalFailedCount() {
@@ -466,7 +445,7 @@ public class HTableMultiplexer {
@edu.umd.cs.findbugs.annotations.SuppressWarnings
(value = "REC_CATCH_EXCEPTION", justification = "na")
public void run() {
- List<PutStatus> processingList = new ArrayList<PutStatus>();
+ List<PutStatus> processingList = new ArrayList<>();
/**
* The frequency in milliseconds for the current thread to process the corresponding
* buffer queue.
@@ -481,6 +460,8 @@ public class HTableMultiplexer {
Thread.currentThread().interrupt();
}
+ AsyncProcess ap = conn.getAsyncProcess();
+
long start, elapsed;
int failedCount = 0;
while (true) {
@@ -496,16 +477,29 @@ public class HTableMultiplexer {
currentProcessingPutCount.set(processingList.size());
if (processingList.size() > 0) {
- ArrayList<Put> list = new ArrayList<Put>(processingList.size());
- for (PutStatus putStatus: processingList) {
- list.add(putStatus.getPut());
+ List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
+ MultiAction<Row> actions = new MultiAction<>();
+ for (int i = 0; i < processingList.size(); i++) {
+ PutStatus putStatus = processingList.get(i);
+ Action<Row> action = new Action<Row>(putStatus.getPut(), i);
+ actions.add(putStatus.getRegionInfo().getRegionName(), action);
+ retainedActions.add(action);
}
- // Process this multiput request
- List<Put> failed = null;
- Object[] results = new Object[list.size()];
+ // Process this multi-put request
+ List<PutStatus> failed = null;
+ Object[] results = new Object[actions.size()];
+ ServerName server = addr.getServerName();
+ Map<ServerName, MultiAction<Row>> actionsByServer =
+ Collections.singletonMap(server, actions);
try {
- htable.batch(list, results);
+ AsyncRequestFuture arf =
+ ap.submitMultiActions(null, retainedActions, 0L, null, results,
+ true, null, null, actionsByServer, pool);
+ arf.waitUntilDone();
+ if (arf.hasError()) {
+ throw arf.getErrors();
+ }
} catch (IOException e) {
LOG.debug("Caught some exceptions " + e
+ " when flushing puts to region server " + addr.getHostnamePort());
@@ -515,35 +509,26 @@ public class HTableMultiplexer {
// results are returned in the same order as the requests in list
// walk the list backwards, so we can remove from list without
// impacting the indexes of earlier members
- for (int i = results.length - 1; i >= 0; i--) {
- if (results[i] instanceof Result) {
- // successful Puts are removed from the list here.
- list.remove(i);
+ for (int i = 0; i < results.length; i++) {
+ if (results[i] == null) {
+ if (failed == null) {
+ failed = new ArrayList<PutStatus>();
+ }
+ failed.add(processingList.get(i));
}
}
- failed = list;
}
if (failed != null) {
- if (failed.size() == processingList.size()) {
- // All the puts for this region server are failed. Going to retry it later
- for (PutStatus putStatus: processingList) {
- if (!resubmitFailedPut(putStatus, this.addr)) {
- failedCount++;
- }
- }
- } else {
- Set<Put> failedPutSet = new HashSet<Put>(failed);
- for (PutStatus putStatus: processingList) {
- if (failedPutSet.contains(putStatus.getPut())
- && !resubmitFailedPut(putStatus, this.addr)) {
- failedCount++;
- }
+ // Resubmit failed puts
+ for (PutStatus putStatus : processingList) {
+ if (!resubmitFailedPut(putStatus, this.addr)) {
+ failedCount++;
}
}
+ // Update the totalFailedCount
+ this.totalFailedPutCount.addAndGet(failedCount);
}
- // Update the totalFailedCount
- this.totalFailedPutCount.addAndGet(failedCount);
elapsed = EnvironmentEdgeManager.currentTime() - start;
// Update latency counters
@@ -580,7 +565,7 @@ public class HTableMultiplexer {
// Log all the exceptions and move on
LOG.debug("Caught some exceptions " + e
+ " when flushing puts to region server "
- + addr.getHostnamePort());
+ + addr.getHostnamePort(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/78d532e5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
index 4fa6678..26fe485 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
@@ -27,8 +27,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -64,10 +64,27 @@ public class TestHTableMultiplexer {
TEST_UTIL.shutdownMiniCluster();
}
+ private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality)
+ throws Exception {
+ // verify that the Get returns the correct result
+ Result r;
+ Get get = new Get(row);
+ get.addColumn(FAMILY, QUALIFIER);
+ int nbTry = 0;
+ do {
+ assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
+ nbTry++;
+ Thread.sleep(100);
+ r = htable.get(get);
+ } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
+ assertEquals("value", Bytes.toStringBinary(VALUE1),
+ Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER)));
+ }
+
@Test
public void testHTableMultiplexer() throws Exception {
- TableName TABLE =
- TableName.valueOf("testHTableMultiplexer");
+ TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1");
+ TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2");
final int NUM_REGIONS = 10;
final int VERSION = 3;
List<Put> failedPuts;
@@ -76,35 +93,35 @@ public class TestHTableMultiplexer {
HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
PER_REGIONSERVER_QUEUE_SIZE);
- HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
+ HTable htable1 =
+ TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION,
Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
- TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
+ HTable htable2 =
+ TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"),
+ Bytes.toBytes("zzzzz"), NUM_REGIONS);
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1);
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2);
- byte[][] startRows = ht.getStartKeys();
- byte[][] endRows = ht.getEndKeys();
+ byte[][] startRows = htable1.getStartKeys();
+ byte[][] endRows = htable1.getEndKeys();
// SinglePut case
for (int i = 0; i < NUM_REGIONS; i++) {
byte [] row = startRows[i];
if (row == null || row.length <= 0) continue;
- Put put = new Put(row);
- put.add(FAMILY, QUALIFIER, VALUE1);
- success = multiplexer.put(TABLE, put);
- assertTrue(success);
+ Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
+ success = multiplexer.put(TABLE_1, put);
+ assertTrue("multiplexer.put returns", success);
+
+ put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
+ success = multiplexer.put(TABLE_2, put);
+ assertTrue("multiplexer.put failed", success);
- LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1));
+ LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1));
// verify that the Get returns the correct result
- Get get = new Get(startRows[i]);
- get.addColumn(FAMILY, QUALIFIER);
- Result r;
- int nbTry = 0;
- do {
- assertTrue(nbTry++ < 50);
- Thread.sleep(100);
- r = ht.get(get);
- } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
- assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER)));
+ checkExistence(htable1, startRows[i], FAMILY, QUALIFIER);
+ checkExistence(htable2, startRows[i], FAMILY, QUALIFIER);
}
// MultiPut case
@@ -116,7 +133,7 @@ public class TestHTableMultiplexer {
put.add(FAMILY, QUALIFIER, VALUE2);
multiput.add(put);
}
- failedPuts = multiplexer.put(TABLE, multiput);
+ failedPuts = multiplexer.put(TABLE_1, multiput);
assertTrue(failedPuts == null);
// verify that the Get returns the correct result
@@ -130,7 +147,7 @@ public class TestHTableMultiplexer {
do {
assertTrue(nbTry++ < 50);
Thread.sleep(100);
- r = ht.get(get);
+ r = htable1.get(get);
} while (r == null || r.getValue(FAMILY, QUALIFIER) == null ||
Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0);
}