You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2008/10/18 00:03:30 UTC

svn commit: r705770 - in /hadoop/hbase/trunk: ./ conf/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regions...

Author: jdcryans
Date: Fri Oct 17 15:03:29 2008
New Revision: 705770

URL: http://svn.apache.org/viewvc?rev=705770&view=rev
Log:
HBASE-748   Add an efficient way to batch update many rows

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/conf/hbase-default.xml
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Oct 17 15:03:29 2008
@@ -54,6 +54,7 @@
                (Andrzej Bialecki via Stack)
 
   OPTIMIZATIONS
+   HBASE-748   Add an efficient way to batch update many rows
    HBASE-887   Fix a hotspot in scanners
 
 Release 0.18.0 - September 21st, 2008

Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Fri Oct 17 15:03:29 2008
@@ -52,6 +52,13 @@
     </description>
   </property>
   <property>
+    <name>hbase.client.write.buffer</name>
+    <value>10485760</value>
+    <description>Size of the write buffer in bytes. A bigger buffer takes more
+    memory but reduces the number of RPC.
+    </description>
+  </property>
+  <property>
     <name>hbase.master.meta.thread.rescanfrequency</name>
     <value>60000</value>
     <description>How long the HMaster sleeps (in milliseconds) between scans of

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnection.java Fri Oct 17 15:03:29 2008
@@ -138,4 +138,16 @@
    */
   public <T> T getRegionServerWithRetries(ServerCallable<T> callable) 
   throws IOException, RuntimeException;
+  
+  /**
+   * Pass in a ServerCallable with your particular bit of logic defined and
+   * this method will pass it to the defined region server.
+   * @param <T> the type of the return value
+   * @param callable
+   * @return an object of type T
+   * @throws IOException
+   * @throws RuntimeException
+   */
+  public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable) 
+  throws IOException, RuntimeException;
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Oct 17 15:03:29 2008
@@ -460,7 +460,6 @@
       final byte [] tableName, final byte [] row, boolean useCache)
     throws IOException{
       HRegionLocation location = null;
-      
       // if we're supposed to be using the cache, then check it for a possible
       // hit. otherwise, delete any existing cached location so it won't 
       // interfere.
@@ -472,7 +471,7 @@
       } else {
         deleteCachedLocation(tableName, row);
       }
-
+      
       // build the key of the meta region we should be looking for.
       // the extra 9's on the end are necessary to allow "exact" matches
       // without knowing the precise region names.
@@ -879,6 +878,29 @@
       }
       return null;    
     }
+    
+    public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
+        throws IOException, RuntimeException {
+      getMaster();
+      try {
+        callable.instantiateServer(false);
+        return callable.call();
+      } catch (Throwable t) {
+        if (t instanceof UndeclaredThrowableException) {
+          t = t.getCause();
+        }
+        if (t instanceof RemoteException) {
+          t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t);
+        }
+        if (t instanceof DoNotRetryIOException) {
+          throw (DoNotRetryIOException) t;
+        }
+        if (t instanceof IOException) {
+          throw (IOException) t;
+        }
+      }
+      return null;    
+    }
 
     void close(boolean stopProxy) {
       if (master != null) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Fri Oct 17 15:03:29 2008
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,6 +55,10 @@
   private final HConnection connection;
   private final byte [] tableName;
   private HBaseConfiguration configuration;
+  private ArrayList<BatchUpdate> writeBuffer;
+  private long writeBufferSize;
+  private boolean autoFlush;
+  private long currentWriteBufferSize;
   private int scannerCaching;
 
   /**
@@ -103,6 +108,11 @@
     this.configuration = conf;
     this.tableName = tableName;
     this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
+    this.writeBuffer = new ArrayList<BatchUpdate>();
+    this.writeBufferSize = 
+      this.configuration.getLong("hbase.client.write.buffer", 10485760);
+    this.autoFlush = true;
+    this.currentWriteBufferSize = 0;
     this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 30);
   }
 
@@ -964,6 +974,7 @@
 
   /**
    * Commit a BatchUpdate to the table.
+   * If autoFlush is false, the update is buffered
    * @param batchUpdate
    * @throws IOException
    */ 
@@ -974,6 +985,7 @@
   
   /**
    * Commit a BatchUpdate to the table using existing row lock.
+   * If autoFlush is false, the update is buffered
    * @param batchUpdate
    * @param rl Existing row lock
    * @throws IOException
@@ -982,43 +994,104 @@
       final RowLock rl) 
   throws IOException {
     checkRowAndColumns(batchUpdate);
-    connection.getRegionServerWithRetries(
-      new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
-        public Boolean call() throws IOException {
-          long lockId = -1L;
-          if(rl != null) {
-            lockId = rl.getLockId();
-          }
-          server.batchUpdate(location.getRegionInfo().getRegionName(), 
-            batchUpdate, lockId);
-          return null;
-        }
-      }
-    );  
+    writeBuffer.add(batchUpdate);
+    currentWriteBufferSize += batchUpdate.getSize();
+    if(autoFlush || currentWriteBufferSize > writeBufferSize) {
+      flushCommits();
+    }
   }
   
   /**
-   * Commit a RowsBatchUpdate to the table.
+   * Commit a List of BatchUpdate to the table.
+   * If autoFlush is false, the updates are buffered
    * @param batchUpdates
    * @throws IOException
    */ 
-  public synchronized void commit(final List<BatchUpdate> batchUpdates) 
-  throws IOException {
-    for (BatchUpdate batchUpdate : batchUpdates) 
-      commit(batchUpdate,null);
+  public synchronized void commit(final List<BatchUpdate> batchUpdates)
+      throws IOException {
+    for(BatchUpdate bu : batchUpdates) {
+      checkRowAndColumns(bu);
+      writeBuffer.add(bu);
+      currentWriteBufferSize += bu.getSize();
+    }
+    if(autoFlush || currentWriteBufferSize > writeBufferSize) {
+      flushCommits();
+    }
   }
   
   /**
-   * Utility method that checks rows existence, length and 
-   * columns well formedness.
+   * Commit to the table the buffer of BatchUpdate.
+   * Called automaticaly in the commit methods when autoFlush is true.
+   * @throws IOException
+   */
+  public void flushCommits() throws IOException {
+    try {
+      // See HBASE-748 for pseudo code of this method
+      if (writeBuffer.isEmpty()) {
+        return;
+      }
+      Collections.sort(writeBuffer);
+      List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>();
+      byte[] currentRegion = connection.getRegionLocation(tableName,
+          writeBuffer.get(0).getRow(), false).getRegionInfo().getRegionName();
+      byte[] region = currentRegion;
+      boolean isLastRow = false;
+      for (int i = 0; i < writeBuffer.size(); i++) {
+        BatchUpdate batchUpdate = writeBuffer.get(i);
+        tempUpdates.add(batchUpdate);
+        isLastRow = (i + 1) == writeBuffer.size();
+        if (!isLastRow) {
+          region = connection.getRegionLocation(tableName,
+              writeBuffer.get(i + 1).getRow(), false).getRegionInfo()
+              .getRegionName();
+        }
+        if (!Bytes.equals(currentRegion, region) || isLastRow) {
+          final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]);
+          int index = connection
+              .getRegionServerForWithoutRetries(new ServerCallable<Integer>(
+                  connection, tableName, batchUpdate.getRow()) {
+                public Integer call() throws IOException {
+                  int i = server.batchUpdates(location.getRegionInfo()
+                      .getRegionName(), updates);
+                  return i;
+                }
+              });
+          if (index != updates.length - 1) {
+            // Basic waiting time. If many updates are flushed, tests have shown
+            // that this is barely needed but when commiting 1 update this may
+            // get retried hundreds of times.
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              // continue
+            }
+            i = i - updates.length + index;
+            region = connection.getRegionLocation(tableName,
+                writeBuffer.get(i + 1).getRow(), true).getRegionInfo()
+                .getRegionName();
+
+          }
+          currentRegion = region;
+          tempUpdates.clear();
+        }
+      }
+    } finally {
+      currentWriteBufferSize = 0;
+      writeBuffer.clear();
+    }
+  }
+
+  /**
+   * Utility method that checks rows existence, length and columns well
+   * formedness.
+   * 
    * @param bu
    * @throws IllegalArgumentException
    * @throws IOException
    */
   private void checkRowAndColumns(BatchUpdate bu)
       throws IllegalArgumentException, IOException {
-    if (bu.getRow() == null || 
-        bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
+    if (bu.getRow() == null || bu.getRow().length > HConstants.MAX_ROW_LENGTH) {
       throw new IllegalArgumentException("Row key is invalid");
     }
     for (BatchOperation bo : bu) {
@@ -1063,6 +1136,46 @@
       }
     );
   }
+  
+  /**
+   * Get the value of autoFlush. If true, updates will not be buffered
+   * @return value of autoFlush
+   */
+  public boolean isAutoFlush() {
+    return autoFlush;
+  }
+
+  /**
+   * Set if this instanciation of HTable will autoFlush
+   * @param autoFlush
+   */
+  public void setAutoFlush(boolean autoFlush) {
+    this.autoFlush = autoFlush;
+  }
+
+  /**
+   * Get the maximum size in bytes of the write buffer for this HTable
+   * @return the size of the write buffer in bytes
+   */
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  /**
+   * Set the size of the buffer in bytes
+   * @param writeBufferSize
+   */
+  public void setWriteBufferSize(long writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+  }
+
+  /**
+   * Get the write buffer 
+   * @return the current write buffer
+   */
+  public ArrayList<BatchUpdate> getWriteBuffer() {
+    return writeBuffer;
+  }
 
   /**
    * Implements the scanner interface for the HBase client.

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java Fri Oct 17 15:03:29 2008
@@ -28,7 +28,7 @@
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 /**
  * A Writable object that contains a series of BatchOperations
@@ -37,10 +37,12 @@
  * can result in multiple BatchUpdate objects if the batch contains rows that
  * are served by multiple region servers.
  */
-public class BatchUpdate implements Writable, Iterable<BatchOperation> {
+public class BatchUpdate implements WritableComparable<BatchUpdate>, 
+  Iterable<BatchOperation> {
   
   // the row being updated
   private byte [] row = null;
+  private long size = 0;
     
   // the batched operations
   private ArrayList<BatchOperation> operations =
@@ -95,6 +97,7 @@
     this.row = row;
     this.timestamp = timestamp;
     this.operations = new ArrayList<BatchOperation>();
+    this.size = (row == null)? 0: row.length;
   }
 
   /** @return the row */
@@ -103,6 +106,13 @@
   }
 
   /**
+   * @return BatchUpdate size in bytes.
+   */
+  public long getSize() {
+    return size;
+  }
+
+  /**
    * @return the timestamp this BatchUpdate will be committed with.
    */
   public long getTimestamp() {
@@ -201,6 +211,7 @@
       // If null, the PUT becomes a DELETE operation.
       throw new IllegalArgumentException("Passed value cannot be null");
     }
+    size += val.length + column.length;
     operations.add(new BatchOperation(column, val));
   }
 
@@ -265,6 +276,7 @@
     }
     this.row = Bytes.readByteArray(in);
     timestamp = in.readLong();
+    this.size = in.readLong();
     int nOps = in.readInt();
     for (int i = 0; i < nOps; i++) {
       BatchOperation op = new BatchOperation();
@@ -276,9 +288,14 @@
   public void write(final DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.row);
     out.writeLong(timestamp);
+    out.writeLong(this.size);
     out.writeInt(operations.size());
     for (BatchOperation op: operations) {
       op.write(out);
     }
   }
-}
+
+  public int compareTo(BatchUpdate o) {
+    return Bytes.compareTo(this.row, o.getRow());
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Oct 17 15:03:29 2008
@@ -131,6 +131,7 @@
     } catch (ClassNotFoundException e) {
       e.printStackTrace();
     }
+    addToMap(BatchUpdate[].class, code++);
   }
   
   private Class<?> declaredClass;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Oct 17 15:03:29 2008
@@ -108,9 +108,19 @@
   throws IOException;
   
   /**
-   * Delete all cells that match the passed row and column and whose
-   * timestamp is equal-to or older than the passed timestamp.
-   *
+   * Applies a batch of updates via one RPC for many rows
+   * 
+   * @param regionName name of the region to update
+   * @param b BatchUpdate[]
+   * @throws IOException
+   */
+  public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
+  throws IOException;
+  
+  /**
+   * Delete all cells that match the passed row and column and whose timestamp
+   * is equal-to or older than the passed timestamp.
+   * 
    * @param regionName region name
    * @param row row key
    * @param column column key

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java Fri Oct 17 15:03:29 2008
@@ -442,7 +442,6 @@
       // it was assigned, and it's not a duplicate assignment, so take it out 
       // of the unassigned list.
       master.regionManager.noLongerUnassigned(region);
-
       if (region.isRootRegion()) {
         // Store the Root Region location (in memory)
         HServerAddress rootServer = serverInfo.getServerAddress();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Oct 17 15:03:29 2008
@@ -1354,6 +1354,22 @@
   //////////////////////////////////////////////////////////////////////////////
   
   /**
+   * Batch update many rows and take splitsAndClosesLock so we don't get 
+   * blocked while updating.
+   * @param bus 
+   */
+  public void batchUpdate(BatchUpdate[] bus) throws IOException {
+    splitsAndClosesLock.readLock().lock();
+    try {
+      for (BatchUpdate bu : bus) {
+        batchUpdate(bu, null);
+      }
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+    }
+  }
+  
+  /**
    * @param b
    * @throws IOException
    */
@@ -1465,32 +1481,29 @@
    * the notify.
    */
   private void checkResources() {
-    if (this.memcacheSize.get() > this.blockingMemcacheSize) {
-      requestFlush();
-      doBlocking();
-    }
-  }
-  
-  private synchronized void doBlocking() {
     boolean blocked = false;
     while (this.memcacheSize.get() > this.blockingMemcacheSize) {
+      requestFlush();
       if (!blocked) {
         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
-            "' on region " + Bytes.toString(getRegionName()) + ": Memcache size " +
-            StringUtils.humanReadableInt(this.memcacheSize.get()) +
-            " is >= than blocking " +
-            StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
+          "' on region " + Bytes.toString(getRegionName()) +
+          ": Memcache size " +
+          StringUtils.humanReadableInt(this.memcacheSize.get()) +
+          " is >= than blocking " +
+          StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
       }
       blocked = true;
-      try {
-        wait(threadWakeFrequency);
-      } catch (InterruptedException e) {
-        // continue;
+      synchronized(this) {
+        try {
+          wait(threadWakeFrequency);
+        } catch (InterruptedException e) {
+          // continue;
+        }
       }
     }
     if (blocked) {
-      LOG.info("Unblocking updates for region " + this + " '" + 
-        Thread.currentThread().getName() + "'");
+      LOG.info("Unblocking updates for region " + this + " '"
+          + Thread.currentThread().getName() + "'");
     }
   }
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Oct 17 15:03:29 2008
@@ -1125,6 +1125,7 @@
   throws IOException {
     if (b.getRow() == null)
       throw new IllegalArgumentException("update has null row");
+    
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
@@ -1141,6 +1142,33 @@
     }
   }
   
+  public int batchUpdates(final byte[] regionName, final BatchUpdate[] b)
+  throws IOException {
+    int i = 0;
+    checkOpen();
+    try {
+      HRegion region = getRegion(regionName);
+      this.cacheFlusher.reclaimMemcacheMemory();
+      for (BatchUpdate batchUpdate : b) {
+        this.requestCount.incrementAndGet();
+        validateValuesLength(batchUpdate, region);
+      }
+      i+= b.length-1;
+      region.batchUpdate(b);
+    } catch (OutOfMemoryError error) {
+      abort();
+      LOG.fatal("Ran out of memory", error);
+    } catch(WrongRegionException ex) {
+      return i;
+    } catch (NotServingRegionException ex) {
+      return i;
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+    return i;
+  }
+  
   /**
    * Utility method to verify values length
    * @param batchUpdate The update to verify

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Fri Oct 17 15:03:29 2008
@@ -372,11 +372,13 @@
     void testSetup() throws IOException {
       this.admin = new HBaseAdmin(conf);
       this.table = new HTable(conf, tableDescriptor.getName());
+      this.table.setAutoFlush(false);
+      this.table.setWriteBufferSize(1024*1024*12);
     }
 
     @SuppressWarnings("unused")
     void testTakedown()  throws IOException {
-      // Empty
+      this.table.flushCommits();
     }
     
     /*

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java?rev=705770&r1=705769&r2=705770&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestBatchUpdate.java Fri Oct 17 15:03:29 2008
@@ -55,8 +55,8 @@
    */
   public TestBatchUpdate() throws UnsupportedEncodingException {
     super();
-    value = "abcd".getBytes(HConstants.UTF8_ENCODING);
-    smallValue = "a".getBytes(HConstants.UTF8_ENCODING);
+    value = Bytes.toBytes("abcd");
+    smallValue = Bytes.toBytes("a");
   }
   
   @Override
@@ -153,4 +153,62 @@
       fail("This is unexpected : " + e);
     }
   }
+  
+  public void testRowsBatchUpdateBufferedOneFlush() {
+    table.setAutoFlush(false);
+    ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
+    for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
+      BatchUpdate batchUpdate = new BatchUpdate("row"+i);
+      batchUpdate.put(CONTENTS, value);
+      rowsUpdate.add(batchUpdate);
+    }
+    try {
+      table.commit(rowsUpdate);  
+    
+      byte [][] columns = { CONTENTS };
+      Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+      int nbRows = 0;
+      for(RowResult row : scanner)
+        nbRows++;
+      assertEquals(0, nbRows);  
+      scanner.close();
+      
+      table.flushCommits();
+      
+      scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+      nbRows = 0;
+      for(RowResult row : scanner)
+        nbRows++;
+      assertEquals(NB_BATCH_ROWS*10, nbRows);
+    } catch (IOException e) {
+      fail("This is unexpected : " + e);
+    }
+  }
+  
+  public void testRowsBatchUpdateBufferedManyManyFlushes() {
+    table.setAutoFlush(false);
+    table.setWriteBufferSize(10);
+    ArrayList<BatchUpdate> rowsUpdate = new ArrayList<BatchUpdate>();
+    for(int i = 0; i < NB_BATCH_ROWS*10; i++) {
+      BatchUpdate batchUpdate = new BatchUpdate("row"+i);
+      batchUpdate.put(CONTENTS, value);
+      rowsUpdate.add(batchUpdate);
+    }
+    try {
+      table.commit(rowsUpdate);
+      
+      table.flushCommits();
+      
+      byte [][] columns = { CONTENTS };
+      Scanner scanner = table.getScanner(columns, HConstants.EMPTY_START_ROW);
+      int nbRows = 0;
+      for(RowResult row : scanner)
+        nbRows++;
+      assertEquals(NB_BATCH_ROWS*10, nbRows);
+    } catch (IOException e) {
+      fail("This is unexpected : " + e);
+    }
+  }
+  
+  
 }