You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2008/10/18 00:03:30 UTC
svn commit: r705770 - in /hadoop/hbase/trunk: ./ conf/
src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apache/hadoop/hbase/master/
src/java/org/apache/hadoop/hbase/regions...
Author: jdcryans
Date: Fri Oct 17 15:03:29 2008
New Revision: 705770
URL: http://svn.apache.org/viewvc?rev=705770&view=rev
Log:
HBASE-748 Add an efficient way to batch update many rows
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/conf/hbase-default.xml
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Oct 17 15:03:29 2008
@@ -54,6 +54,7 @@
(Andrzej Bialecki via Stack)
OPTIMIZATIONS
+ HBASE-748 Add an efficient way to batch update many rows
HBASE-887 Fix a hotspot in scanners
Release 0.18.0 - September 21st, 2008
Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Fri Oct 17 15:03:29 2008
@@ -52,6 +52,13 @@
</description>
</property>
<property>
+ <name>hbase.client.write.buffer</name>
+ <value>10485760</value>
+ <description>Size of the write buffer in bytes. A bigger buffer takes more
+ memory but reduces the number of RPC.
+ </description>
+ </property>
+ <property>
<name>hbase.master.meta.thread.rescanfrequency</name>
<value>60000</value>
<description>How long the HMaster sleeps (in milliseconds) between scans of
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java Fri Oct 17 15:03:29 2008
@@ -138,4 +138,16 @@
*/
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException;
+
+ /**
+ * Pass in a ServerCallable with your particular bit of logic defined and
+ * this method will pass it to the defined region server.
+ * @param <T> the type of the return value
+ * @param callable
+ * @return an object of type T
+ * @throws IOException
+ * @throws RuntimeException
+ */
+ public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException;
}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Oct 17 15:03:29 2008
@@ -460,7 +460,6 @@
final byte [] tableName, final byte [] row, boolean useCache)
throws IOException{
HRegionLocation location = null;
-
// if we're supposed to be using the cache, then check it for a possible
// hit. otherwise, delete any existing cached location so it won't
// interfere.
@@ -472,7 +471,7 @@
} else {
deleteCachedLocation(tableName, row);
}
-
+
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
@@ -879,6 +878,29 @@
}
return null;
}
+
+ public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException {
+ getMaster();
+ try {
+ callable.instantiateServer(false);
+ return callable.call();
+ } catch (Throwable t) {
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ if (t instanceof RemoteException) {
+ t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t);
+ }
+ if (t instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException) t;
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ }
+ return null;
+ }
void close(boolean stopProxy) {
if (master != null) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Fri Oct 17 15:03:29 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -54,6 +55,10 @@
private final HConnection connection;
private final byte [] tableName;
private HBaseConfiguration configuration;
+ private ArrayList<BatchUpdate> writeBuffer;
+ private long writeBufferSize;
+ private boolean autoFlush;
+ private long currentWriteBufferSize;
private int scannerCaching;
/**
@@ -103,6 +108,11 @@
this.configuration = conf;
this.tableName = tableName;
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
+ this.writeBuffer = new ArrayList<BatchUpdate>();
+ this.writeBufferSize =
+ this.configuration.getLong("hbase.client.write.buffer", 10485760);
+ this.autoFlush = true;
+ this.currentWriteBufferSize = 0;
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 30);
}
@@ -964,6 +974,7 @@
/**
* Commit a BatchUpdate to the table.
+ * If autoFlush is false, the update is buffered
* @param batchUpdate
* @throws IOException
*/
@@ -974,6 +985,7 @@
/**
* Commit a BatchUpdate to the table using existing row lock.
+ * If autoFlush is false, the update is buffered
* @param batchUpdate
* @param rl Existing row lock
* @throws IOException
@@ -982,43 +994,104 @@
final RowLock rl)
throws IOException {
checkRowAndColumns(batchUpdate);
- connection.getRegionServerWithRetries(
- new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
- public Boolean call() throws IOException {
- long lockId = -1L;
- if(rl != null) {
- lockId = rl.getLockId();
- }
- server.batchUpdate(location.getRegionInfo().getRegionName(),
- batchUpdate, lockId);
- return null;
- }
- }
- );
+ writeBuffer.add(batchUpdate);
+ currentWriteBufferSize += batchUpdate.getSize();
+ if(autoFlush || currentWriteBufferSize > writeBufferSize) {
+ flushCommits();
+ }
}
/**
- * Commit a RowsBatchUpdate to the table.
+ * Commit a List of BatchUpdate to the table.
+ * If autoFlush is false, the updates are buffered
* @param batchUpdates
* @throws IOException
*/
- public synchronized void commit(final List<BatchUpdate> batchUpdates)
- throws IOException {
- for (BatchUpdate batchUpdate : batchUpdates)
- commit(batchUpdate,null);
+ public synchronized void commit(final List<BatchUpdate> batchUpdates)
+ throws IOException {
+ for(BatchUpdate bu : batchUpdates) {
+ checkRowAndColumns(bu);
+ writeBuffer.add(bu);
+ currentWriteBufferSize += bu.getSize();
+ }
+ if(autoFlush || currentWriteBufferSize > writeBufferSize) {
+ flushCommits();
+ }
}
/**
- * Utility method that checks rows existence, length and
- * columns well formedness.
+ * Commit to the table the buffer of BatchUpdate.
+ * Called automaticaly in the commit methods when autoFlush is true.
+ * @throws IOException
+ */
+ public void flushCommits() throws IOException {
+ try {
+ // See HBASE-748 for pseudo code of this method
+ if (writeBuffer.isEmpty()) {
+ return;
+ }
+ Collections.sort(writeBuffer);
+ List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>();
+ byte[] currentRegion = connection.getRegionLocation(tableName,
+ writeBuffer.get(0).getRow(), false).getRegionInfo().getRegionName();
+ byte[] region = currentRegion;
+ boolean isLastRow = false;
+ for (int i = 0; i < writeBuffer.size(); i++) {
+ BatchUpdate batchUpdate = writeBuffer.get(i);
+ tempUpdates.add(batchUpdate);
+ isLastRow = (i + 1) == writeBuffer.size();
+ if (!isLastRow) {
+ region = connection.getRegionLocation(tableName,
+ writeBuffer.get(i + 1).getRow(), false).getRegionInfo()
+ .getRegionName();
+ }
+ if (!Bytes.equals(currentRegion, region) || isLastRow) {
+ final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]);
+ int index = connection
+ .getRegionServerForWithoutRetries(new ServerCallable<Integer>(
+ connection, tableName, batchUpdate.getRow()) {
+ public Integer call() throws IOException {
+ int i = server.batchUpdates(location.getRegionInfo()
+ .getRegionName(), updates);
+ return i;
+ }
+ });
+ if (index != updates.length - 1) {
+ // Basic waiting time. If many updates are flushed, tests have shown
+ // that this is barely needed but when commiting 1 update this may
+ // get retried hundreds of times.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ i = i - updates.length + index;
+ region = connection.getRegionLocation(tableName,
+ writeBuffer.get(i + 1).getRow(), true).getRegionInfo()
+ .getRegionName();
+
+ }
+ currentRegion = region;
+ tempUpdates.clear();
+ }
+ }
+ } finally {
+ currentWriteBufferSize = 0;
+ writeBuffer.clear();
+ }
+ }
+
+ /**
+ * Utility method that checks rows existence, length and columns well
+ * formedness.
+ *
* @param bu
* @throws IllegalArgumentException
* @throws IOException
*/
private void checkRowAndColumns(BatchUpdate bu)
throws IllegalArgumentException, IOException {
- if (bu.getRow() == null ||
- bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
+ if (bu.getRow() == null || bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
throw new IllegalArgumentException("Row key is invalid");
}
for (BatchOperation bo : bu) {
@@ -1063,6 +1136,46 @@
}
);
}
+
+ /**
+ * Get the value of autoFlush. If true, updates will not be buffered
+ * @return value of autoFlush
+ */
+ public boolean isAutoFlush() {
+ return autoFlush;
+ }
+
+ /**
+ * Set if this instanciation of HTable will autoFlush
+ * @param autoFlush
+ */
+ public void setAutoFlush(boolean autoFlush) {
+ this.autoFlush = autoFlush;
+ }
+
+ /**
+ * Get the maximum size in bytes of the write buffer for this HTable
+ * @return the size of the write buffer in bytes
+ */
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ /**
+ * Set the size of the buffer in bytes
+ * @param writeBufferSize
+ */
+ public void setWriteBufferSize(long writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ /**
+ * Get the write buffer
+ * @return the current write buffer
+ */
+ public ArrayList<BatchUpdate> getWriteBuffer() {
+ return writeBuffer;
+ }
/**
* Implements the scanner interface for the HBase client.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java Fri Oct 17 15:03:29 2008
@@ -28,7 +28,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
/**
* A Writable object that contains a series of BatchOperations
@@ -37,10 +37,12 @@
* can result in multiple BatchUpdate objects if the batch contains rows that
* are served by multiple region servers.
*/
-public class BatchUpdate implements Writable, Iterable<BatchOperation> {
+public class BatchUpdate implements WritableComparable<BatchUpdate>,
+ Iterable<BatchOperation> {
// the row being updated
private byte [] row = null;
+ private long size = 0;
// the batched operations
private ArrayList<BatchOperation> operations =
@@ -95,6 +97,7 @@
this.row = row;
this.timestamp = timestamp;
this.operations = new ArrayList<BatchOperation>();
+ this.size = (row == null)? 0: row.length;
}
/** @return the row */
@@ -103,6 +106,13 @@
}
/**
+ * @return BatchUpdate size in bytes.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
* @return the timestamp this BatchUpdate will be committed with.
*/
public long getTimestamp() {
@@ -201,6 +211,7 @@
// If null, the PUT becomes a DELETE operation.
throw new IllegalArgumentException("Passed value cannot be null");
}
+ size += val.length + column.length;
operations.add(new BatchOperation(column, val));
}
@@ -265,6 +276,7 @@
}
this.row = Bytes.readByteArray(in);
timestamp = in.readLong();
+ this.size = in.readLong();
int nOps = in.readInt();
for (int i = 0; i < nOps; i++) {
BatchOperation op = new BatchOperation();
@@ -276,9 +288,14 @@
public void write(final DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.row);
out.writeLong(timestamp);
+ out.writeLong(this.size);
out.writeInt(operations.size());
for (BatchOperation op: operations) {
op.write(out);
}
}
-}
+
+ public int compareTo(BatchUpdate o) {
+ return Bytes.compareTo(this.row, o.getRow());
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Oct 17 15:03:29 2008
@@ -131,6 +131,7 @@
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
+ addToMap(BatchUpdate[].class, code++);
}
private Class<?> declaredClass;
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Oct 17 15:03:29 2008
@@ -108,9 +108,19 @@
throws IOException;
/**
- * Delete all cells that match the passed row and column and whose
- * timestamp is equal-to or older than the passed timestamp.
- *
+ * Applies a batch of updates via one RPC for many rows
+ *
+ * @param regionName name of the region to update
+ * @param b BatchUpdate[]
+ * @throws IOException
+ */
+ public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
+ throws IOException;
+
+ /**
+ * Delete all cells that match the passed row and column and whose timestamp
+ * is equal-to or older than the passed timestamp.
+ *
* @param regionName region name
* @param row row key
* @param column column key
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Oct 17 15:03:29 2008
@@ -442,7 +442,6 @@
// it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list.
master.regionManager.noLongerUnassigned(region);
-
if (region.isRootRegion()) {
// Store the Root Region location (in memory)
HServerAddress rootServer = serverInfo.getServerAddress();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Oct 17 15:03:29 2008
@@ -1354,6 +1354,22 @@
//////////////////////////////////////////////////////////////////////////////
/**
+ * Batch update many rows and take splitsAndClosesLock so we don't get
+ * blocked while updating.
+ * @param bus
+ */
+ public void batchUpdate(BatchUpdate[] bus) throws IOException {
+ splitsAndClosesLock.readLock().lock();
+ try {
+ for (BatchUpdate bu : bus) {
+ batchUpdate(bu, null);
+ }
+ } finally {
+ splitsAndClosesLock.readLock().unlock();
+ }
+ }
+
+ /**
* @param b
* @throws IOException
*/
@@ -1465,32 +1481,29 @@
* the notify.
*/
private void checkResources() {
- if (this.memcacheSize.get() > this.blockingMemcacheSize) {
- requestFlush();
- doBlocking();
- }
- }
-
- private synchronized void doBlocking() {
boolean blocked = false;
while (this.memcacheSize.get() > this.blockingMemcacheSize) {
+ requestFlush();
if (!blocked) {
LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
- "' on region " + Bytes.toString(getRegionName()) + ": Memcache size " +
- StringUtils.humanReadableInt(this.memcacheSize.get()) +
- " is >= than blocking " +
- StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
+ "' on region " + Bytes.toString(getRegionName()) +
+ ": Memcache size " +
+ StringUtils.humanReadableInt(this.memcacheSize.get()) +
+ " is >= than blocking " +
+ StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
}
blocked = true;
- try {
- wait(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue;
+ synchronized(this) {
+ try {
+ wait(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // continue;
+ }
}
}
if (blocked) {
- LOG.info("Unblocking updates for region " + this + " '" +
- Thread.currentThread().getName() + "'");
+ LOG.info("Unblocking updates for region " + this + " '"
+ + Thread.currentThread().getName() + "'");
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Oct 17 15:03:29 2008
@@ -1125,6 +1125,7 @@
throws IOException {
if (b.getRow() == null)
throw new IllegalArgumentException("update has null row");
+
checkOpen();
this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName);
@@ -1141,6 +1142,33 @@
}
}
+ public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
+ throws IOException {
+ int i = 0;
+ checkOpen();
+ try {
+ HRegion region = getRegion(regionName);
+ this.cacheFlusher.reclaimMemcacheMemory();
+ for (BatchUpdate batchUpdate : b) {
+ this.requestCount.incrementAndGet();
+ validateValuesLength(batchUpdate, region);
+ }
+ i+= b.length-1;
+ region.batchUpdate(b);
+ } catch (OutOfMemoryError error) {
+ abort();
+ LOG.fatal("Ran out of memory", error);
+ } catch(WrongRegionException ex) {
+ return i;
+ } catch (NotServingRegionException ex) {
+ return i;
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ return i;
+ }
+
/**
* Utility method to verify values length
* @param batchUpdate The update to verify
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Fri Oct 17 15:03:29 2008
@@ -372,11 +372,13 @@
void testSetup() throws IOException {
this.admin = new HBaseAdmin(conf);
this.table = new HTable(conf, tableDescriptor.getName());
+ this.table.setAutoFlush(false);
+ this.table.setWriteBufferSize(1024*1024*12);
}
@SuppressWarnings("unused")
void testTakedown() throws IOException {
- // Empty
+ this.table.flushCommits();
}
/*
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java Fri Oct 17 15:03:29 2008
@@ -55,8 +55,8 @@
*/
public TestBatchUpdate() throws UnsupportedEncodingException {
super();
- value = "abcd".getBytes(HConstants.UTF8_ENCODING);
- smallValue = "a".getBytes(HConstants.UTF8_ENCODING);
+ value = Bytes.toBytes("abcd");
+ smallValue = Bytes.toBytes("a");
}
@Override
@@ -153,4 +153,62 @@
fail("This is unexpected : " + e);
}
}
+
+ public void testRowsBatchUpdateBufferedOneFlush() {
+ table.setAutoFlush(false);
+ ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
+ for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
+ BatchUpdate batchUpdate = new BatchUpdate("row"+i);
+ batchUpdate.put(CONTENTS, value);
+ rowsUpdate.add(batchUpdate);
+ }
+ try {
+ table.commit(rowsUpdate);
+
+ byte [][] columns = { CONTENTS };
+ Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+ int nbRows = 0;
+ for(RowResult row : scanner)
+ nbRows++;
+ assertEquals(0, nbRows);
+ scanner.close();
+
+ table.flushCommits();
+
+ scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+ nbRows = 0;
+ for(RowResult row : scanner)
+ nbRows++;
+ assertEquals(NB_BATCH_ROWS*10, nbRows);
+ } catch (IOException e) {
+ fail("This is unexpected : " + e);
+ }
+ }
+
+ public void testRowsBatchUpdateBufferedManyManyFlushes() {
+ table.setAutoFlush(false);
+ table.setWriteBufferSize(10);
+ ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
+ for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
+ BatchUpdate batchUpdate = new BatchUpdate("row"+i);
+ batchUpdate.put(CONTENTS, value);
+ rowsUpdate.add(batchUpdate);
+ }
+ try {
+ table.commit(rowsUpdate);
+
+ table.flushCommits();
+
+ byte [][] columns = { CONTENTS };
+ Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+ int nbRows = 0;
+ for(RowResult row : scanner)
+ nbRows++;
+ assertEquals(NB_BATCH_ROWS*10, nbRows);
+ } catch (IOException e) {
+ fail("This is unexpected : " + e);
+ }
+ }
+
+
}