You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/18 20:40:21 UTC

svn commit: r1387312 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/...

Author: mbautin
Date: Tue Sep 18 18:40:20 2012
New Revision: 1387312

URL: http://svn.apache.org/viewvc?rev=1387312&view=rev
Log:
[HBASE-5783] [0.89-fb] Reliably bulk load Puts/Deletes to HBase, without WAL.

Author: aaiyer

Summary:
Currently, there are 2 ways to load data *reliably* into HBase:
(i) individual Puts/Gets to HBase with WAL
(ii) MapReduce based Bulk Load mechanism

Option (i) is slow due to WALs. Option (ii) is slow due to the map
reduce merges doing everything through disk.

This is a hybrid approach of writing everything, directly to memstore,
without WAL. And, then waiting/initiating a memstore flush to provide
durability.

Test Plan:
add a new unit test
 TestBatchedUpload

16 /tmp/2012-09-18_09_07_29_hbase-89int_newHiveUpload_v5-mrFailures  Failed on MR:  TestBatchedUpload TestFromClientSide TestHRegionCloseRetry TestHRegionServerFileSystemFailure TestHsHaServerCmdLine TestMasterFailover TestMultiClusters TestNativeThriftClient TestRegionSplitter TestReplication TestSchemaModificationLocks TestThreadPoolServerFramedCmdLine TestThreadPoolServerUnframedCmdLine TestThreadedSelectorServerCmdLine TestThriftServer TestTimestamp
1 /tmp/2012-09-18_09_07_29_hbase-89int_newHiveUpload_v5-localFailures  Failed on Local:  None.

Reviewers: kranganathan, liyintang

Reviewed By: kranganathan

CC: hbase-eng@, mbautin, gqchen

Differential Revision: https://phabricator.fb.com/D570100

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientSideDoNotRetryException.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallableForBatchOps.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientSideDoNotRetryException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientSideDoNotRetryException.java?rev=1387312&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientSideDoNotRetryException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientSideDoNotRetryException.java Tue Sep 18 18:40:20 2012
@@ -0,0 +1,31 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+/**
+ * Exceptions that extend will not be  retried at the Client end.
+ */
+public class ClientSideDoNotRetryException extends IOException {
+  public ClientSideDoNotRetryException (String msg) {
+    super(msg);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Tue Sep 18 18:40:20 2012
@@ -356,4 +356,24 @@ public interface HConnection extends Clo
    */
   public void prewarmRegionCache(final byte[] tableName,
       final Map<HRegionInfo, HServerAddress> regions);
+
+  /**
+   * Starts tracking the updates made to the tableName so that
+   * we can ensure that the updates were completed and flushed to
+   * disk at the end of the job.
+   * @param tableName -- the table for which we should start tracking
+   */
+  public void startBatchedLoad(byte[] tableName);
+
+  /**
+   * Ensure that all the updates made to the table, since
+   * startBatchedLoad was called are persisted. This method
+   * waits for all the regionservers contacted to
+   * flush all the data written so far. If this doesn't happen
+   * within a configurable amount of time, it requests the regions
+   * to flush.
+   * @param tableName -- tableName to flush all puts/deletes for.
+   * @param options -- hbase rpc options to use when talking to regionservers
+   */
+  public void endBatchedLoad(byte[] tableName, HBaseRPCOptions options) throws IOException;
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Sep 18 18:40:20 2012
@@ -37,10 +37,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -74,10 +76,15 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MetaUtils;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -434,6 +441,11 @@ public class HConnectionManager {
     // created for, we do not want the thrift layer to hold up IPC threads handling retries.
     private int maxServerRequestedRetries;
 
+    // keep track of servers that have been updated for batchedLoad
+    // tablename -> Map
+    Map<String, ConcurrentMap<HRegionInfo, HServerAddress>> batchedUploadUpdatesMap;
+    private int batchedUploadSoftFlushRetries;
+    private long batchedUploadSoftFlushTimeoutMillis;
     /**
      * constructor
      * @param conf Configuration object
@@ -479,6 +491,12 @@ public class HConnectionManager {
 
       this.master = null;
       this.masterChecked = false;
+      this.batchedUploadSoftFlushRetries =
+          conf.getInt("hbase.client.batched-upload.softflush.retries", 10);
+      this.batchedUploadSoftFlushTimeoutMillis =
+          conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
+      batchedUploadUpdatesMap  = new ConcurrentHashMap<String,
+          ConcurrentMap<HRegionInfo, HServerAddress>>();
     }
 
     private long getPauseTime(int tries) {
@@ -1445,6 +1463,9 @@ public class HConnectionManager {
           roe = ex;
           serverRequestedWaitTime = roe.getBackoffTimeMillis();
           continue;
+        } catch (ClientSideDoNotRetryException exp) {
+          // Bail out of the retry loop, immediately
+          throw exp;
         } catch (PreemptiveFastFailException pfe) {
           // Bail out of the retry loop, if the host has been consistently unreachable.
           throw pfe;
@@ -1522,7 +1543,9 @@ public class HConnectionManager {
       boolean couldNotCommunicateWithServer = false;
       boolean retryDespiteFastFailMode = false;
       try {
-        if (instantiateRegionLocation) callable.instantiateRegionLocation(false);
+        if (instantiateRegionLocation) {
+          callable.instantiateRegionLocation(false);
+        }
 
         // Logic to fast fail requests to unreachable servers.
         server = callable.getServerAddress();
@@ -1542,6 +1565,8 @@ public class HConnectionManager {
         return callable.call();
       } catch (PreemptiveFastFailException pfe) {
         throw pfe;
+      } catch (ClientSideDoNotRetryException exp) {
+        throw exp;
       } catch (Throwable t1) {
         Throwable t2 = translateException(t1);
         boolean isLocalException = !(t2 instanceof RemoteException);
@@ -1696,32 +1721,70 @@ public class HConnectionManager {
       occasionallyCleanupFailureInformation();
     }
 
+    private Callable<Long> createCurrentTimeCallable(
+        final HServerAddress address,
+        final HBaseRPCOptions options) {
+      final HConnection connection = this;
+      return new Callable<Long>() {
+        public Long call() throws IOException {
+          return getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<Long>(connection, address, options) {
+              public Long call() throws IOException {
+                return server.getCurrentTimeMillis();
+              }
+            });
+        }
+      };
+    }
+
+
+    private Callable<MapWritable> createGetLastFlushTimesCallable(
+        final HServerAddress address,
+        final HBaseRPCOptions options) {
+      final HConnection connection = this;
+      return new Callable<MapWritable>() {
+        public MapWritable call() throws IOException {
+          return getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<MapWritable>(connection, address, options) {
+              public MapWritable call() throws IOException {
+                return server.getLastFlushTimes();
+              }
+            });
+        }
+      };
+    }
+
+    private Callable<Void> createFlushCallable(final HServerAddress address,
+        final HRegionInfo region,
+        final long targetFlushTime,
+        final HBaseRPCOptions options) {
+      final HConnection connection = this;
+      return new Callable<Void>() {
+        public Void  call() throws IOException {
+          return getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<Void>(connection, address, options) {
+              public Void call() throws IOException {
+                server.flushRegion(region.getRegionName(), targetFlushTime);
+                return null;
+              }
+            });
+      }};
+    }
+
+
     private <R> Callable<MultiResponse> createMultiActionCallable(final HServerAddress address,
         final MultiAction multi, final byte [] tableName,
         final HBaseRPCOptions options) {
       final HConnection connection = this;
+      // no need to track mutations here. Done at the caller.
       return new Callable<MultiResponse>() {
        public MultiResponse call() throws IOException {
          return getRegionServerWithoutRetries(
-             new ServerCallable<MultiResponse>(connection, tableName, null, options) {
+             new ServerCallableForBatchOps<MultiResponse>(connection, address, options) {
                public MultiResponse call() throws IOException {
                  return server.multiAction(multi);
                }
-
-               @Override
-               public void instantiateRegionLocation(boolean reload) throws IOException {
-                  // we don't need to locate the region, since we have been given the address of the
-                  // server. But, let us store the information in this.location, so that
-                  // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
-                  this.location = new HRegionLocation(null, address);
-               }
-
-               @Override
-               public void instantiateServer() throws IOException {
-                 server = connection.getHRegionConnection(address, options);
-               }
-             }
-         );
+             });
        }
      };
    }
@@ -1768,6 +1831,7 @@ public class HConnectionManager {
          Row row = workingList.get(i);
          if (row != null) {
            HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
+
            byte[] regionName = loc.getRegionInfo().getRegionName();
 
            MultiAction actions = actionsByServer.get(loc.getServerAddress());
@@ -1779,6 +1843,8 @@ public class HConnectionManager {
            if (isGets) {
              actions.addGet(regionName, (Get)row, i);
            } else {
+             trackMutationsToTable(tableName,
+                 loc.getRegionInfo(), loc.getServerAddress());
              actions.mutate(regionName, (Mutation)row);
            }
          }
@@ -2204,6 +2270,9 @@ public class HConnectionManager {
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
               tableName, row, options) {
             public Integer call() throws IOException {
+              trackMutationsToTable(tableName,
+                  location.getRegionInfo(),
+                  location.getServerAddress());
               return server.put(location.getRegionInfo().getRegionName(), puts);
             }
           });
@@ -2259,6 +2328,9 @@ public class HConnectionManager {
           getRegionServerWithRetries(new ServerCallable<Void>(this.c,
                 tableName, row, options) {
               public Void call() throws IOException {
+                trackMutationsToTable(tableName,
+                    location.getRegionInfo(),
+                    location.getServerAddress());
                 server.mutateRow(location.getRegionInfo().getRegionName(),
                   mutations);
                 return null;
@@ -2284,6 +2356,9 @@ public class HConnectionManager {
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
                 tableName, row, options) {
               public Integer call() throws IOException {
+                trackMutationsToTable(tableName,
+                    location.getRegionInfo(),
+                    location.getServerAddress());
                 return server.delete(location.getRegionInfo().getRegionName(),
                   deletes);
               }
@@ -2326,6 +2401,8 @@ public class HConnectionManager {
           regionPuts.put(address, mput);
         }
         mput.add(regionName, put);
+        trackMutationsToTable(tableName,
+            loc.getRegionInfo(), loc.getServerAddress());
       }
 
       return new ArrayList<MultiPut>(regionPuts.values());
@@ -2379,6 +2456,10 @@ public class HConnectionManager {
               ex.getCause() instanceof PreemptiveFastFailException) {
             throw (PreemptiveFastFailException)ex.getCause();
           }
+
+          if (ex.getCause() instanceof ClientSideDoNotRetryException) {
+            throw (ClientSideDoNotRetryException)ex.getCause();
+          }
         }
 
         // For each region
@@ -2483,6 +2564,9 @@ public class HConnectionManager {
           roe = ex;
           serverRequestedWaitTime = roe.getBackoffTimeMillis();
           continue;
+        } catch (ClientSideDoNotRetryException exp) {
+          // Bail out of the retry loop, immediately
+          throw exp;
         } catch (PreemptiveFastFailException pfe) {
           throw pfe;
         }
@@ -2527,28 +2611,14 @@ public class HConnectionManager {
       return new Callable<MultiPutResponse>() {
         public MultiPutResponse call() throws IOException {
           return getRegionServerWithoutRetries(
-              new ServerCallable<MultiPutResponse>(connection, null, null, options) {
+              new ServerCallableForBatchOps<MultiPutResponse>(connection, address, options) {
                 public MultiPutResponse call() throws IOException {
+                  // caller of createPutCallable is responsible to track mutations.
                   MultiPutResponse resp = server.multiPut(puts);
                   resp.request = puts;
                   return resp;
                 }
-                @Override
-                public void instantiateRegionLocation(boolean reload)
-                    throws IOException {
-                  // we don't need to locate the region, since we have been given the address of the
-                  // server. But, let us store the information in this.location, so that
-                  // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
-                  // (see HConnectionManager.getRegionServerWithRetries for how it is used to
-                  // call deleteCachedLocation.)
-                  this.location = new HRegionLocation(null, address);
-                }
-                @Override
-                public void instantiateServer() throws IOException {
-                  server = connection.getHRegionConnection(address, options);
-                }
-              }
-          );
+              });
         }
       };
     }
@@ -2616,5 +2686,266 @@ public class HConnectionManager {
             new HRegionLocation(e.getKey(), e.getValue()));
       }
     }
+
+    @Override
+    public void startBatchedLoad(byte[] tableName) {
+      batchedUploadUpdatesMap.put(Bytes.toString(tableName),
+          new ConcurrentHashMap<HRegionInfo, HServerAddress>());
+    }
+
+    @Override
+    public void endBatchedLoad(byte[] tableName, HBaseRPCOptions options) throws IOException {
+      Map<HRegionInfo, HServerAddress> regionsUpdated = getRegionsUpdated(tableName);
+
+      // get the current TS from the RegionServer
+      Map<HRegionInfo, Long> targetTSMap = getCurrentTimeForRegions(regionsUpdated, options);
+
+      // loop to ensure that we have flushed beyond the corresponding TS.
+      int tries = 0;
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      long waitUntilForHardFlush = now + batchedUploadSoftFlushTimeoutMillis;
+
+      while (tries++ < this.batchedUploadSoftFlushRetries
+          && now < waitUntilForHardFlush) {
+        // get the lastFlushedTS from the RegionServer. throws Exception if the region has
+        // moved elsewhere
+        Map<HRegionInfo, Long> flushedTSMap =
+            getRegionFlushTimes(regionsUpdated, options);
+
+        for (Entry<HRegionInfo, Long> entry: targetTSMap.entrySet()) {
+          HRegionInfo region = entry.getKey();
+          long targetTime = entry.getValue().longValue();
+          long flushedTime = flushedTSMap.get(region).longValue();
+          if (flushedTime > targetTime) {
+            targetTSMap.remove(region);
+          }
+          LOG.debug("Region " + region.getEncodedName() + " was flushed at "
+              + flushedTime
+              + (flushedTime > targetTime
+                  ? ". All updates we made are already on disk."
+                  : ". Still waiting for updates to go to the disk.")
+              + " Last update was made "
+              + (flushedTime > targetTime
+                  ? ((flushedTime - targetTime) + " ms before flush.")
+                  : ((targetTime - flushedTime) + " ms after flush.")));
+        }
+
+        if (targetTSMap.isEmpty()) {
+          LOG.info("All regions have been flushed.");
+          break;
+        }
+        LOG.info("Try #" + tries + ". Still waiting to flush " + targetTSMap.size() + " regions.");
+        long sleepTime = getPauseTime(tries);
+        Threads.sleep(sleepTime);
+        now = EnvironmentEdgeManager.currentTimeMillis();
+      }
+
+      // if we have not succeded in flushing all. Force flush.
+      if (!targetTSMap.isEmpty()) {
+        LOG.info("Forcing regions to flush.");
+        flushRegionsAtServers(targetTSMap, regionsUpdated, options);
+      }
+
+      clearMutationsToTable(tableName);
+    }
+
+    private void trackMutationsToTable(byte[] tableNameBytes,
+        HRegionInfo regionInfo, HServerAddress serverAddress) throws IOException {
+      String tableName = Bytes.toString(tableNameBytes);
+      HServerAddress oldAddress  = !batchedUploadUpdatesMap.containsKey(tableName) ? null
+          : batchedUploadUpdatesMap.get(tableName).putIfAbsent(regionInfo, serverAddress);
+      if (oldAddress != null && !oldAddress.equals(serverAddress)) {
+        throw new ClientSideDoNotRetryException("Region "
+            + regionInfo.getRegionNameAsString() + " moved from " + oldAddress
+            + ". but has moved to." + serverAddress );
+      }
+    }
+
+    /**
+     * Return a map of regions (and the servers they were on) to which
+     * mutations were made since {@link startBatchedLoad(tableName)} was
+     * called.
+     * @param tableName
+     * @return Map containing regionInfo, and the servers they were on.
+     */
+    private Map<HRegionInfo, HServerAddress> getRegionsUpdated(byte[] tableName) {
+      return batchedUploadUpdatesMap.get(Bytes.toString(tableName));
+    }
+
+    /**
+     * Clear the map of regions (and the servers) contacted for the specified
+     * table.
+     * @param tableName
+     */
+    private void clearMutationsToTable(byte[] tableNameBytes) {
+      String tableName = Bytes.toString(tableNameBytes);
+      if (batchedUploadUpdatesMap.containsKey(tableName)) {
+        batchedUploadUpdatesMap.get(tableName).clear();
+      }
+    }
+
+    /**
+     * Get the current time in milliseconds at the server for each
+     * of the regions in the map.
+     * @param regionsContacted -- map of regions to server address
+     * @param options -- rpc options to use
+     * @return Map of regions to Long, representing current time in mills at the server
+     * @throws IOException if (a) could not talk to the server, or (b) any of the regions
+     * have moved from the location indicated in regionsContacted.
+     */
+    private Map<HRegionInfo, Long> getCurrentTimeForRegions(
+        Map<HRegionInfo, HServerAddress> regionsContacted,
+        HBaseRPCOptions options) throws IOException {
+
+      Map<HRegionInfo, Long> currentTimeForRegions =
+          new HashMap<HRegionInfo, Long>();
+
+      Map<HServerAddress, Long> currentTimeAtServers =
+          new HashMap<HServerAddress, Long>();
+      HashMap<HServerAddress, Future<Long>> futures =
+          new HashMap<HServerAddress,Future<Long>>();
+
+      // get flush times from that server
+      for (HServerAddress server: regionsContacted.values()) {
+        futures.put(server, HTable.multiActionThreadPool.submit(
+            createCurrentTimeCallable(server, options)));
+      }
+
+      // populate serverTimes;
+      for (HServerAddress server: futures.keySet()) {
+        Future<Long> future = futures.get(server);
+        try {
+          currentTimeAtServers.put(server, future.get());
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted: Could not get current time from server"
+                      + server);
+        } catch (ExecutionException e) {
+          throw new IOException("Could not get current time from " + server,
+              e.getCause());
+        }
+      }
+
+      for(HRegionInfo region: regionsContacted.keySet()) {
+        HServerAddress serverToLookFor = regionsContacted.get(region);
+        Long currentTime = currentTimeAtServers.get(serverToLookFor);
+        currentTimeForRegions.put(region, currentTime);
+      }
+      return currentTimeForRegions;
+    }
+
+    /**
+     * Ask the regionservers to flush the given regions, if they have not flushed
+     * past the desired time.
+     * @param targetTSMap -- map of regions to the desired flush time
+     * @param regionsContacted -- map of regions to server address
+     * @param options -- rpc options to use
+     * @throws IOException if (a) could not talk to the server, or (b) any of the regions
+     * have moved from the location indicated in regionsContacted.
+     */
+    private void  flushRegionsAtServers(Map<HRegionInfo, Long> targetTSMap,
+        Map<HRegionInfo, HServerAddress> regionsContacted,
+        HBaseRPCOptions options) throws IOException {
+
+      Map<HRegionInfo, Future<Void>> futures =
+          new HashMap<HRegionInfo, Future<Void>>();
+
+      // get flush times from that server
+      for (HRegionInfo region: targetTSMap.keySet()) {
+        HServerAddress server = regionsContacted.get(region);
+        long targetFlushTime = targetTSMap.get(region).longValue();
+
+        LOG.debug("forcing a flush at " + server.getHostname());
+        futures.put(region, HTable.multiActionThreadPool.submit(
+            // use targetFlushTime + 1 to force a flush even if they are equal.
+            createFlushCallable(server, region, targetFlushTime + 1, options)));
+      }
+
+      boolean toCancel = false;
+      IOException toThrow = null;
+      // populate regionTimes;
+      for(HRegionInfo region: futures.keySet()) {
+        Future<Void> future = futures.get(region);
+        try {
+          if (!toCancel) {
+            future.get();
+          } else {
+            future.cancel(true);
+          }
+        } catch (InterruptedException e) {
+          toCancel = true;
+          toThrow = new IOException("Got Interrupted: Could not flush region " + region);
+        } catch (ExecutionException e) {
+          toThrow = new IOException("Could not flush region " + region, e.getCause());
+        }
+      }
+      if (toThrow != null) {
+        throw toThrow;
+      }
+    }
+
+    /**
+     * Ask the regionservers for the last flushed time for each regions.
+     * @param regionsContacted -- map of regions to server address
+     * @param options -- rpc options to use
+     * @return Map from the region to the region's last flushed time.
+     * @throws IOException if (a) could not talk to the server, or (b) any of the regions
+     * have moved from the location indicated in regionsContacted.
+     */
+    private Map<HRegionInfo, Long> getRegionFlushTimes(
+        Map<HRegionInfo, HServerAddress> regionsContacted,
+        HBaseRPCOptions options) throws IOException {
+
+      Map<HRegionInfo, Long> regionFlushTimesMap =
+          new HashMap<HRegionInfo, Long>();
+
+      Map<HServerAddress, MapWritable> rsRegionTimes =
+          new HashMap<HServerAddress, MapWritable>();
+      Map<HServerAddress, Future<MapWritable>> futures =
+          new HashMap<HServerAddress, Future<MapWritable>>();
+
+      // get flush times from that server
+      for (HServerAddress server: regionsContacted.values()) {
+        futures.put(server, HTable.multiActionThreadPool.submit(
+            createGetLastFlushTimesCallable(server, options)));
+      }
+
+      boolean toCancel = false;
+      IOException toThrow = null;
+      // populate flushTimes;
+      for(HServerAddress server: futures.keySet()) {
+        Future<MapWritable> future = futures.get(server);
+        try {
+          if (!toCancel) {
+            rsRegionTimes.put(server, future.get());
+          } else {
+            future.cancel(true);
+          }
+        } catch (InterruptedException e) {
+          toThrow = new IOException("Got Interrupted: Could not get last flushed times");
+        } catch (ExecutionException e) {
+          toThrow = new IOException("Could not get last flushed times from " + server, e.getCause());
+        }
+      }
+      if (toThrow != null) {
+        throw toThrow;
+      }
+
+      for(HRegionInfo region: regionsContacted.keySet()) {
+        HServerAddress serverToLookFor = regionsContacted.get(region);
+
+        MapWritable serverMap = rsRegionTimes.get(serverToLookFor);
+        LongWritable lastFlushedTime = (LongWritable) serverMap.get(
+            new BytesWritable(region.getRegionName()));
+
+        if (lastFlushedTime == null) { // The region is no longer on the server
+          throw new ClientSideDoNotRetryException("Region "
+                                + region.getRegionNameAsString() + " was on "
+                                + serverToLookFor + " but no longer there." );
+        }
+
+        regionFlushTimesMap.put(region, new Long(lastFlushedTime.get()));
+      }
+      return regionFlushTimesMap;
+    }
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Tue Sep 18 18:40:20 2012
@@ -1322,4 +1322,23 @@ public class HTable implements HTableInt
   public Compression.Algorithm getRxCompression() {
     return this.options.getRxCompression();
   }
+
+  /**
+   * Starts tracking the updates made to this table so that
+   * we can ensure that the updates were completed and flushed to
+   * disk at the end of the job.
+   */
+  public void startBatchedLoad() {
+    connection.startBatchedLoad(tableName);
+  }
+
+  /**
+   * Ensure that all the updates made to the table, since
+   * startBatchedLoad was called are persisted. This method
+   * waits for all the regionservers contacted, to
+   * flush all the data written so far.
+   */
+  public void endBatchedLoad() throws IOException {
+    connection.endBatchedLoad(tableName, this.options);
+  }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallableForBatchOps.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallableForBatchOps.java?rev=1387312&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallableForBatchOps.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallableForBatchOps.java Tue Sep 18 18:40:20 2012
@@ -0,0 +1,35 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+
+/**
+ * A partial implementation of ServerCallable, used for batch operations
+ * spanning multiple regions/rows.
+ * @param <T> the class that the ServerCallable handles
+ */
+public abstract class ServerCallableForBatchOps<T> extends ServerCallable<T> {
+  HServerAddress address;
+
+  public ServerCallableForBatchOps(HConnection connection, HServerAddress address,
+      HBaseRPCOptions options) {
+    super(connection, null, null, options);
+    this.address = address;
+  }
+
+  @Override
+  public void instantiateRegionLocation(boolean reload) throws IOException {
+     // we don't need to locate the region, since we have been given the address of the
+     // server. But, let us store the information in this.location, so that
+     // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
+     this.location = new HRegionLocation(null, address);
+  }
+
+  @Override
+  public void instantiateServer() throws IOException {
+    server = connection.getHRegionConnection(address, options);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Sep 18 18:40:20 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.io.MapWritable;
 
 /**
  * Clients interact with HRegionServers using a handle to the HRegionInterface.
@@ -93,12 +95,24 @@ public interface HRegionInterface extend
     throws IllegalArgumentException, IOException;
 
   /**
-   * Gets last flush time for the given region
+   * Gets last flush time (in milli sec) for the given region
    * @return the last flush time for a region
    */
   public long getLastFlushTime(byte[] regionName);
 
   /**
+   * Gets last flush time (in milli sec) for all regions on the server
+   * @return a map of regionName to the last flush time for the region
+   */
+  public MapWritable getLastFlushTimes();
+
+  /**
+   * Gets the current time (in milli sec) at the region server
+   * @return time in milli seconds at the regionserver.
+   */
+  public long getCurrentTimeMillis();
+
+  /**
    * Get a list of store files for a particular CF in a particular region
    * @param region name
    * @param CF name
@@ -393,6 +407,6 @@ public interface HRegionInterface extend
   /**
    * Update the configuration.
    */
-  public void updateConfiguration()	throws IOException;
+  public void updateConfiguration() throws IOException;
 
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Sep 18 18:40:20 2012
@@ -273,7 +273,7 @@ public class HRegion implements HeapSize
 
   final long timestampTooNew;
   final long memstoreFlushSize;
-  private volatile long lastFlushTime;
+  private volatile long lastFlushTime; // start time for the last successful flush.
   private List<Pair<Long,Long>> recentFlushes
     = new ArrayList<Pair<Long,Long>>();
   final FlushRequester flushListener;
@@ -1398,11 +1398,10 @@ public class HRegion implements HeapSize
   protected boolean internalFlushcache(final HLog wal, final long myseqid,
       MonitoredTask status) throws IOException {
     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
-    // Clear flush flag.
-    // Record latest flush time
-    this.lastFlushTime = startTime;
     // If nothing to flush, return and avoid logging start/stop flush.
     if (this.memstoreSize.get() <= 0) {
+      // Record latest flush time
+      this.lastFlushTime = startTime;
       return false;
     }
     if (LOG.isDebugEnabled()) {
@@ -1528,6 +1527,8 @@ public class HRegion implements HeapSize
 
     // If we get to here, the HStores have been written. If we get an
     // error in completeCacheFlush it will release the lock it is holding
+    // update lastFlushTime after the HStores have been written.
+    this.lastFlushTime = startTime;
 
     // B.  Write a FLUSHCACHE-COMPLETE message to the log.
     //     This tells future readers that the HStores were emitted correctly,
@@ -1862,6 +1863,7 @@ public class HRegion implements HeapSize
     HRegion.writeOps.incrementAndGet();
   }
 
+
   /**
    * @param put
    * @throws IOException

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Sep 18 18:40:20 2012
@@ -143,6 +143,8 @@ import org.apache.hadoop.hbase.util.Runt
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -2893,6 +2895,21 @@ public class HRegionServer implements HR
     return region.getLastFlushTime();
   }
 
+  @Override
+  public MapWritable getLastFlushTimes() {
+     MapWritable map = new MapWritable();
+     for (HRegion region: this.getOnlineRegions()) {
+       map.put(new BytesWritable(region.getRegionName()),
+           new LongWritable(region.getLastFlushTime()));
+     }
+     return map;
+  }
+
+  @Override
+  public long getCurrentTimeMillis() {
+    return EnvironmentEdgeManager.currentTimeMillis();
+  }
+
   public Map<HRegionInfo, String> getSortedOnlineRegionInfosAndOpenDate() {
     TreeMap<HRegionInfo, String> result = new TreeMap<HRegionInfo, String>();
     synchronized(this.onlineRegions) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1387312&r1=1387311&r2=1387312&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Tue Sep 18 18:40:20 2012
@@ -138,7 +138,7 @@ public class Threads {
   /**
    * @param millis How long to sleep for in milliseconds.
    */
-  public static void sleep(int millis) {
+  public static void sleep(long millis) {
     try {
       Thread.sleep(millis);
     } catch (InterruptedException e) {
@@ -150,7 +150,7 @@ public class Threads {
    * Sleeps for the given amount of time. Retains the thread's interruption status. 
    * @param millis How long to sleep for in milliseconds.
    */
-  public static void sleepRetainInterrupt(int millis) {
+  public static void sleepRetainInterrupt(long millis) {
     try {
       Thread.sleep(millis);
     } catch (InterruptedException e) {

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java?rev=1387312&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java Tue Sep 18 18:40:20 2012
@@ -0,0 +1,150 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Run tests that use the HBase clients; {@link HTable} and {@link HTablePool}.
+ * Sets up the HBase mini cluster once at start and runs through all client tests.
+ * Each creates a table named for the method and does its stuff against that.
+ */
+public class TestBatchedUpload {
+  private static final Log LOG = LogFactory.getLog(TestBatchedUpload.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static int SLAVES = 5;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBatchedUpload() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testBatchedUpload");
+    int NUM_REGIONS = 10;
+    HTable ht = TEST_UTIL.createTable(TABLE, new byte[][]{FAMILY},
+        3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+    int NUM_ROWS = 1000;
+
+    // start batch processing
+    // do a bunch of puts
+    // finish batch. Check for Exceptions.
+    int attempts = writeData(ht, NUM_ROWS);
+    assert(attempts > 1);
+
+    readData(ht, NUM_ROWS);
+
+    ht.close();
+  }
+
+  public int writeData(HTable table, long numRows) throws IOException {
+    int attempts = 0;
+    int MAX = 10;
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    int numRS;
+    Random rand = new Random(5234234);
+    double killProb = 2.0 / numRows;
+    double prob;
+    int kills = 0;
+
+    while (attempts < MAX) {
+      try {
+        attempts++;
+
+        // start batched session
+        table.startBatchedLoad();
+
+        // do batched puts
+        // with WAL turned off
+        for (long i = 0; i < numRows; i++) {
+          byte [] rowKey = longToByteArrayKey(i);
+          Put put = new Put(rowKey);
+          byte[] value = rowKey; // value is the same as the row key
+          put.add(FAMILY, QUALIFIER, value);
+          put.setWriteToWAL(false);
+          table.put(put);
+
+          prob = rand.nextDouble();
+          if (kills < 2 && prob < killProb) { // kill up to 2 rs
+            kills++;
+            // kill a random one
+            numRS = cluster.getRegionServerThreads().size();
+            int idxToKill = Math.abs(rand.nextInt()) % numRS;
+            LOG.debug("Try " + attempts + " written Puts : " + i);
+            LOG.info("Randomly killing region server " + idxToKill + ". Got probability " + prob
+                + " < " + killProb);
+            cluster.abortRegionServer(idxToKill);
+
+            // keep decreasing the probability of killing the RS
+            killProb = killProb / 2;
+          }
+        }
+
+        LOG.info("Written all puts. Trying to end Batch");
+        // complete batched puts
+        table.endBatchedLoad();
+        return attempts;
+
+      } catch (IOException e) {
+        e.printStackTrace();
+        LOG.info("Failed try # " + attempts);
+      }
+    }
+
+    throw new IOException("Failed to do batched puts after " + MAX + " retries.");
+  }
+
+  public void readData(HTable table, long numRows) throws IOException {
+    for(long i = 0; i < numRows; i++) {
+      byte [] rowKey = longToByteArrayKey(i);
+
+      Get get = new Get(rowKey);
+      get.addColumn(FAMILY, QUALIFIER);
+      get.setMaxVersions(1);
+      Result result = table.get(get);
+      assertTrue(Arrays.equals(rowKey, result.getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  private byte[] longToByteArrayKey(long rowKey) {
+    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+  }
+}