You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/22 14:52:00 UTC

svn commit: r1388800 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase...

Author: mbautin
Date: Sat Sep 22 12:51:59 2012
New Revision: 1388800

URL: http://svn.apache.org/viewvc?rev=1388800&view=rev
Log:
[HBASE-5783] [0.89-fb] Improve false positives: Retry only if the server restarts

Author: aaiyer

Summary:
Batched upload should not abort the job, if the regions
were moved due to benign reasons.

Test Plan:
run MR tests.
add a unit test to test the behavior

Reviewers: kannan, kranganathan, mbautin

Reviewed By: kranganathan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D581121

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java Sat Sep 22 12:51:59 2012
@@ -26,6 +26,11 @@ package org.apache.hadoop.hbase;
 public class HRegionLocation implements Comparable<HRegionLocation> {
   private HRegionInfo regionInfo;
   private HServerAddress serverAddress;
+  private long serverStartCode;
+
+  public long getServerStartCode() {
+    return serverStartCode;
+  }
 
   /**
    * Constructor
@@ -34,8 +39,13 @@ public class HRegionLocation implements 
    * @param serverAddress the HServerAddress for the region server
    */
   public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
+    this(regionInfo, serverAddress, -1);
+  }
+  public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress,
+      long serverStartCode) {
     this.regionInfo = regionInfo;
     this.serverAddress = serverAddress;
+    this.serverStartCode = serverStartCode;
   }
 
   /**
@@ -43,8 +53,8 @@ public class HRegionLocation implements 
    */
   @Override
   public String toString() {
-    return "address: " + this.serverAddress.toString() + ", regioninfo: " +
-      this.regionInfo;
+    return "address: " + this.serverAddress.toString() + ", serverStartCode: " +
+      this.serverStartCode + ", regioninfo: " + this.regionInfo;
   }
 
   /**
@@ -71,6 +81,7 @@ public class HRegionLocation implements 
   public int hashCode() {
     int result = this.regionInfo.hashCode();
     result ^= this.serverAddress.hashCode();
+    result ^= (int)this.serverStartCode;
     return result;
   }
 
@@ -93,6 +104,10 @@ public class HRegionLocation implements 
     if(result == 0) {
       result = this.serverAddress.compareTo(o.serverAddress);
     }
+    if(result == 0) {
+      result = (this.serverStartCode > o.serverStartCode) ? 1 :
+      (this.serverStartCode < o.serverStartCode)? -1 :  0;
+    }
     return result;
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Sep 22 12:51:59 2012
@@ -28,6 +28,7 @@ import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -443,7 +444,7 @@ public class HConnectionManager {
 
     // keep track of servers that have been updated for batchedLoad
     // tablename -> Map
-    Map<String, ConcurrentMap<HRegionInfo, HServerAddress>> batchedUploadUpdatesMap;
+    Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
     private int batchedUploadSoftFlushRetries;
     private long batchedUploadSoftFlushTimeoutMillis;
     /**
@@ -496,7 +497,7 @@ public class HConnectionManager {
       this.batchedUploadSoftFlushTimeoutMillis =
           conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
       batchedUploadUpdatesMap  = new ConcurrentHashMap<String,
-          ConcurrentMap<HRegionInfo, HServerAddress>>();
+          ConcurrentMap<HRegionInfo, HRegionLocation>>();
     }
 
     private long getPauseTime(int tries) {
@@ -878,9 +879,16 @@ public class HConnectionManager {
               }
               final String serverAddress = Bytes.toString(value);
 
+              value = result.getValue(HConstants.CATALOG_FAMILY,
+                  HConstants.STARTCODE_QUALIFIER);
+              long serverStartCode = -1;
+              if(value != null) {
+                serverStartCode = Bytes.toLong(value);
+              }
+
               // instantiate the location
               HRegionLocation loc = new HRegionLocation(regionInfo,
-                new HServerAddress(serverAddress));
+                new HServerAddress(serverAddress), serverStartCode);
               // cache this meta entry
               cacheLocation(tableName, loc);
             }
@@ -1020,9 +1028,16 @@ public class HConnectionManager {
               Bytes.toStringBinary(row));
           }
 
+          value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+              HConstants.STARTCODE_QUALIFIER);
+          long serverStartCode = -1;
+          if(value != null) {
+            serverStartCode = Bytes.toLong(value);
+          }
           // instantiate the location
           location = new HRegionLocation(regionInfo,
-            new HServerAddress(serverAddress));
+            new HServerAddress(serverAddress),
+            serverStartCode);
           cacheLocation(tableName, location);
           return location;
         } catch (TableNotFoundException e) {
@@ -1721,6 +1736,22 @@ public class HConnectionManager {
       occasionallyCleanupFailureInformation();
     }
 
+    private Callable<Long> createGetServerStartCodeCallable(
+        final HServerAddress address,
+        final HBaseRPCOptions options) {
+      final HConnection connection = this;
+      return new Callable<Long>() {
+        public Long call() throws IOException {
+          return getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<Long>(connection, address, options) {
+              public Long call() throws IOException {
+                return server.getStartCode();
+              }
+            });
+        }
+      };
+    }
+
     private Callable<Long> createCurrentTimeCallable(
         final HServerAddress address,
         final HBaseRPCOptions options) {
@@ -1738,6 +1769,7 @@ public class HConnectionManager {
     }
 
 
+
     private Callable<MapWritable> createGetLastFlushTimesCallable(
         final HServerAddress address,
         final HBaseRPCOptions options) {
@@ -1843,8 +1875,7 @@ public class HConnectionManager {
            if (isGets) {
              actions.addGet(regionName, (Get)row, i);
            } else {
-             trackMutationsToTable(tableName,
-                 loc.getRegionInfo(), loc.getServerAddress());
+             trackMutationsToTable(tableName, loc);
              actions.mutate(regionName, (Mutation)row);
            }
          }
@@ -2270,9 +2301,7 @@ public class HConnectionManager {
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
               tableName, row, options) {
             public Integer call() throws IOException {
-              trackMutationsToTable(tableName,
-                  location.getRegionInfo(),
-                  location.getServerAddress());
+              trackMutationsToTable(tableName, location);
               return server.put(location.getRegionInfo().getRegionName(), puts);
             }
           });
@@ -2328,9 +2357,7 @@ public class HConnectionManager {
           getRegionServerWithRetries(new ServerCallable<Void>(this.c,
                 tableName, row, options) {
               public Void call() throws IOException {
-                trackMutationsToTable(tableName,
-                    location.getRegionInfo(),
-                    location.getServerAddress());
+                trackMutationsToTable(tableName, location);
                 server.mutateRow(location.getRegionInfo().getRegionName(),
                   mutations);
                 return null;
@@ -2356,9 +2383,7 @@ public class HConnectionManager {
           return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
                 tableName, row, options) {
               public Integer call() throws IOException {
-                trackMutationsToTable(tableName,
-                    location.getRegionInfo(),
-                    location.getServerAddress());
+                trackMutationsToTable(tableName, location);
                 return server.delete(location.getRegionInfo().getRegionName(),
                   deletes);
               }
@@ -2401,8 +2426,7 @@ public class HConnectionManager {
           regionPuts.put(address, mput);
         }
         mput.add(regionName, put);
-        trackMutationsToTable(tableName,
-            loc.getRegionInfo(), loc.getServerAddress());
+        trackMutationsToTable(tableName, loc);
       }
 
       return new ArrayList<MultiPut>(regionPuts.values());
@@ -2690,74 +2714,97 @@ public class HConnectionManager {
     @Override
     public void startBatchedLoad(byte[] tableName) {
       batchedUploadUpdatesMap.put(Bytes.toString(tableName),
-          new ConcurrentHashMap<HRegionInfo, HServerAddress>());
+          new ConcurrentHashMap<HRegionInfo, HRegionLocation>());
     }
 
     @Override
     public void endBatchedLoad(byte[] tableName, HBaseRPCOptions options) throws IOException {
-      Map<HRegionInfo, HServerAddress> regionsUpdated = getRegionsUpdated(tableName);
-
-      // get the current TS from the RegionServer
-      Map<HRegionInfo, Long> targetTSMap = getCurrentTimeForRegions(regionsUpdated, options);
+      Map<HRegionInfo, HRegionLocation> regionsUpdated = getRegionsUpdated(tableName);
 
-      // loop to ensure that we have flushed beyond the corresponding TS.
-      int tries = 0;
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      long waitUntilForHardFlush = now + batchedUploadSoftFlushTimeoutMillis;
+      try {
+        // get the current TS from the RegionServer
+        Map<HRegionInfo, Long> targetTSMap = getCurrentTimeForRegions(regionsUpdated, options);
 
-      while (tries++ < this.batchedUploadSoftFlushRetries
-          && now < waitUntilForHardFlush) {
-        // get the lastFlushedTS from the RegionServer. throws Exception if the region has
-        // moved elsewhere
-        Map<HRegionInfo, Long> flushedTSMap =
-            getRegionFlushTimes(regionsUpdated, options);
-
-        for (Entry<HRegionInfo, Long> entry: targetTSMap.entrySet()) {
-          HRegionInfo region = entry.getKey();
-          long targetTime = entry.getValue().longValue();
-          long flushedTime = flushedTSMap.get(region).longValue();
-          if (flushedTime > targetTime) {
-            targetTSMap.remove(region);
-          }
-          LOG.debug("Region " + region.getEncodedName() + " was flushed at "
-              + flushedTime
-              + (flushedTime > targetTime
-                  ? ". All updates we made are already on disk."
-                  : ". Still waiting for updates to go to the disk.")
-              + " Last update was made "
-              + (flushedTime > targetTime
-                  ? ((flushedTime - targetTime) + " ms before flush.")
-                  : ((targetTime - flushedTime) + " ms after flush.")));
+        // loop to ensure that we have flushed beyond the corresponding TS.
+        int tries = 0;
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        long waitUntilForHardFlush = now + batchedUploadSoftFlushTimeoutMillis;
+
+        while (tries++ < this.batchedUploadSoftFlushRetries
+            && now < waitUntilForHardFlush) {
+          // get the lastFlushedTS from the RegionServer. throws Exception if the region has
+          // moved elsewhere
+          Map<HRegionInfo, Long> flushedTSMap =
+              getRegionFlushTimes(regionsUpdated, options);
+
+          for (Entry<HRegionInfo, Long> entry: targetTSMap.entrySet()) {
+            HRegionInfo region = entry.getKey();
+            long targetTime = entry.getValue().longValue();
+            long flushedTime = flushedTSMap.get(region).longValue();
+            if (flushedTime > targetTime) {
+              targetTSMap.remove(region);
+            }
+            LOG.debug("Region " + region.getEncodedName() + " was flushed at "
+                + flushedTime
+                + (flushedTime > targetTime
+                    ? ". All updates we made are already on disk."
+                    : ". Still waiting for updates to go to the disk.")
+                + " Last update was made "
+                + (flushedTime > targetTime
+                    ? ((flushedTime - targetTime) + " ms before flush.")
+                    : ((targetTime - flushedTime) + " ms after flush.")));
+          }
+
+          if (targetTSMap.isEmpty()) {
+            LOG.info("All regions have been flushed.");
+            break;
+          }
+          LOG.info("Try #" + tries + ". Still waiting to flush " + targetTSMap.size() + " regions.");
+          long sleepTime = getPauseTime(tries);
+          Threads.sleep(sleepTime);
+          now = EnvironmentEdgeManager.currentTimeMillis();
         }
 
-        if (targetTSMap.isEmpty()) {
-          LOG.info("All regions have been flushed.");
-          break;
-        }
-        LOG.info("Try #" + tries + ". Still waiting to flush " + targetTSMap.size() + " regions.");
-        long sleepTime = getPauseTime(tries);
-        Threads.sleep(sleepTime);
-        now = EnvironmentEdgeManager.currentTimeMillis();
-      }
+        // if we have not succeded in flushing all. Force flush.
+        if (!targetTSMap.isEmpty()) {
+          LOG.info("Forcing regions to flush.");
+          flushRegionsAtServers(targetTSMap, regionsUpdated, options);
+        }
+
+        // check to see that the servers' current startcode is the same as the one
+        // that we have in the map. Having a different start code means that the
+        // regionserver may have restarted. To avoid data loss, in case the restart
+        // were unclean, we will throw an exception and expect the client to retry.
+        //
+        // Note that we should be doing this at the very end. After flushing.
+        // A successful region flush does not guarantee that the data was
+        // persisted correctly. A regionserver  could have crashed/restarted,
+        // opened the same region again, and then flushed it.
+        checkServersAreAlive(regionsUpdated.values(), options);
 
-      // if we have not succeded in flushing all. Force flush.
-      if (!targetTSMap.isEmpty()) {
-        LOG.info("Forcing regions to flush.");
-        flushRegionsAtServers(targetTSMap, regionsUpdated, options);
+        clearMutationsToTable(tableName);
+      } catch (IOException e) {
+        throw new ClientSideDoNotRetryException("One or more regionservers have restarted"
+            + ". Please retry the job");
       }
-
-      clearMutationsToTable(tableName);
     }
 
     private void trackMutationsToTable(byte[] tableNameBytes,
-        HRegionInfo regionInfo, HServerAddress serverAddress) throws IOException {
+        HRegionLocation location) throws IOException {
       String tableName = Bytes.toString(tableNameBytes);
-      HServerAddress oldAddress  = !batchedUploadUpdatesMap.containsKey(tableName) ? null
-          : batchedUploadUpdatesMap.get(tableName).putIfAbsent(regionInfo, serverAddress);
-      if (oldAddress != null && !oldAddress.equals(serverAddress)) {
-        throw new ClientSideDoNotRetryException("Region "
-            + regionInfo.getRegionNameAsString() + " moved from " + oldAddress
-            + ". but has moved to." + serverAddress );
+      HRegionInfo regionInfo = location.getRegionInfo();
+      HServerAddress  serverAddress = location.getServerAddress();
+      HRegionLocation oldLocation  = !batchedUploadUpdatesMap.containsKey(tableName) ? null
+          : batchedUploadUpdatesMap.get(tableName).putIfAbsent(regionInfo, location);
+      if (oldLocation != null && !oldLocation.equals(location)) {
+        // check if the old server is alive
+        try {
+          checkIfAlive(oldLocation);
+        } catch (IOException e) {
+          throw new ClientSideDoNotRetryException("Region "
+              + regionInfo.getRegionNameAsString() + " moved from " + oldLocation
+              + " to." + serverAddress + ". Old location not reachable" );
+        }
       }
     }
 
@@ -2768,7 +2815,7 @@ public class HConnectionManager {
      * @param tableName
      * @return Map containing regionInfo, and the servers they were on.
      */
-    private Map<HRegionInfo, HServerAddress> getRegionsUpdated(byte[] tableName) {
+    private Map<HRegionInfo, HRegionLocation> getRegionsUpdated(byte[] tableName) {
       return batchedUploadUpdatesMap.get(Bytes.toString(tableName));
     }
 
@@ -2785,6 +2832,72 @@ public class HConnectionManager {
     }
 
     /**
+     * This method is called to check if the remote server is alive, and has the
+     * same invocation id/start code, as before.
+     *
+     * @param location
+     * @throws RuntimeException
+     * @throws IOException
+     */
+    private void checkIfAlive(HRegionLocation location) throws IOException {
+        checkServersAreAlive(Collections.singletonList(location), HBaseRPCOptions.DEFAULT);
+    }
+
+    /**
+     * This method is called to check if the servers that hosted the regions are alive,
+     * and have the same invocation id/start code, as before.
+     * @param regionsContacted -- collection of region locations
+     * @param options -- rpc options to use
+     * @throws IOException if (a) could not talk to the server, or (b) any of the regions
+     * have moved from the location indicated in regionsContacted.
+     */
+    private void checkServersAreAlive(
+        Collection<HRegionLocation> regionsContacted,
+        HBaseRPCOptions options) throws IOException {
+
+      HashMap<HServerAddress, Future<Long>> futures =
+          new HashMap<HServerAddress,Future<Long>>();
+      Map<HServerAddress, Long> currentServerStartCode =
+          new HashMap<HServerAddress, Long>();
+
+      // get start codes from that server
+      for (HRegionLocation location: regionsContacted) {
+        HServerAddress server = location.getServerAddress();
+        if (!futures.containsKey(server)) {
+          futures.put(server, HTable.multiActionThreadPool.submit(
+              createGetServerStartCodeCallable(server, options)));
+        }
+      }
+
+      // populate server start codes;
+      for (HServerAddress server: futures.keySet()) {
+        Future<Long> future = futures.get(server);
+        try {
+          currentServerStartCode.put(server, future.get());
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted: Could not get current time from server"
+                      + server);
+        } catch (ExecutionException e) {
+          throw new IOException("Could not get current time from " + server,
+              e.getCause());
+        }
+      }
+
+      // check startcodes for each region
+      for (HRegionLocation location: regionsContacted) {
+        HServerAddress server = location.getServerAddress();
+        long expectedStartCode = location.getServerStartCode();
+          Long startCode = currentServerStartCode.get(server);
+          if (startCode.longValue() != expectedStartCode) {
+            LOG.debug("Current startcode for server " + server + " is " + startCode.longValue()
+                + " looking for " + location.toString());
+            throw new IOException("RegionServer restarted.");
+          }
+      }
+    }
+
+
+    /**
      * Get the current time in milliseconds at the server for each
      * of the regions in the map.
      * @param regionsContacted -- map of regions to server address
@@ -2794,7 +2907,7 @@ public class HConnectionManager {
      * have moved from the location indicated in regionsContacted.
      */
     private Map<HRegionInfo, Long> getCurrentTimeForRegions(
-        Map<HRegionInfo, HServerAddress> regionsContacted,
+        Map<HRegionInfo, HRegionLocation> regionsContacted,
         HBaseRPCOptions options) throws IOException {
 
       Map<HRegionInfo, Long> currentTimeForRegions =
@@ -2806,9 +2919,12 @@ public class HConnectionManager {
           new HashMap<HServerAddress,Future<Long>>();
 
       // get flush times from that server
-      for (HServerAddress server: regionsContacted.values()) {
-        futures.put(server, HTable.multiActionThreadPool.submit(
-            createCurrentTimeCallable(server, options)));
+      for (HRegionLocation location: regionsContacted.values()) {
+        HServerAddress server = location.getServerAddress();
+        if (!futures.containsKey(server)) {
+          futures.put(server, HTable.multiActionThreadPool.submit(
+              createCurrentTimeCallable(server, options)));
+        }
       }
 
       // populate serverTimes;
@@ -2826,7 +2942,7 @@ public class HConnectionManager {
       }
 
       for(HRegionInfo region: regionsContacted.keySet()) {
-        HServerAddress serverToLookFor = regionsContacted.get(region);
+        HServerAddress serverToLookFor = regionsContacted.get(region).getServerAddress();
         Long currentTime = currentTimeAtServers.get(serverToLookFor);
         currentTimeForRegions.put(region, currentTime);
       }
@@ -2837,13 +2953,13 @@ public class HConnectionManager {
      * Ask the regionservers to flush the given regions, if they have not flushed
      * past the desired time.
      * @param targetTSMap -- map of regions to the desired flush time
-     * @param regionsContacted -- map of regions to server address
+     * @param regionsUpdated -- map of regions to server address
      * @param options -- rpc options to use
      * @throws IOException if (a) could not talk to the server, or (b) any of the regions
      * have moved from the location indicated in regionsContacted.
      */
     private void  flushRegionsAtServers(Map<HRegionInfo, Long> targetTSMap,
-        Map<HRegionInfo, HServerAddress> regionsContacted,
+        Map<HRegionInfo, HRegionLocation> regionsUpdated,
         HBaseRPCOptions options) throws IOException {
 
       Map<HRegionInfo, Future<Void>> futures =
@@ -2851,7 +2967,7 @@ public class HConnectionManager {
 
       // get flush times from that server
       for (HRegionInfo region: targetTSMap.keySet()) {
-        HServerAddress server = regionsContacted.get(region);
+        HServerAddress server = regionsUpdated.get(region).getServerAddress();
         long targetFlushTime = targetTSMap.get(region).longValue();
 
         LOG.debug("forcing a flush at " + server.getHostname());
@@ -2885,14 +3001,14 @@ public class HConnectionManager {
 
     /**
      * Ask the regionservers for the last flushed time for each regions.
-     * @param regionsContacted -- map of regions to server address
+     * @param regionsUpdated -- map of regions to server address
      * @param options -- rpc options to use
      * @return Map from the region to the region's last flushed time.
      * @throws IOException if (a) could not talk to the server, or (b) any of the regions
      * have moved from the location indicated in regionsContacted.
      */
     private Map<HRegionInfo, Long> getRegionFlushTimes(
-        Map<HRegionInfo, HServerAddress> regionsContacted,
+        Map<HRegionInfo, HRegionLocation> regionsUpdated,
         HBaseRPCOptions options) throws IOException {
 
       Map<HRegionInfo, Long> regionFlushTimesMap =
@@ -2904,9 +3020,12 @@ public class HConnectionManager {
           new HashMap<HServerAddress, Future<MapWritable>>();
 
       // get flush times from that server
-      for (HServerAddress server: regionsContacted.values()) {
-        futures.put(server, HTable.multiActionThreadPool.submit(
-            createGetLastFlushTimesCallable(server, options)));
+      for (HRegionLocation location: regionsUpdated.values()) {
+        HServerAddress server = location.getServerAddress();
+        if (!futures.containsKey(server)) {
+          futures.put(server, HTable.multiActionThreadPool.submit(
+              createGetLastFlushTimesCallable(server, options)));
+        }
       }
 
       boolean toCancel = false;
@@ -2930,8 +3049,8 @@ public class HConnectionManager {
         throw toThrow;
       }
 
-      for(HRegionInfo region: regionsContacted.keySet()) {
-        HServerAddress serverToLookFor = regionsContacted.get(region);
+      for(HRegionInfo region: regionsUpdated.keySet()) {
+        HServerAddress serverToLookFor = regionsUpdated.get(region).getServerAddress();
 
         MapWritable serverMap = rsRegionTimes.get(serverToLookFor);
         LongWritable lastFlushedTime = (LongWritable) serverMap.get(

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Sat Sep 22 12:51:59 2012
@@ -109,6 +109,12 @@ public interface HRegionInterface extend
   public long getCurrentTimeMillis();
 
   /**
+   * Gets the current startCode at the region server
+   * @return startCode -- time in milli seconds when the regionserver started.
+   */
+  public long getStartCode();
+
+  /**
    * Get a list of store files for a particular CF in a particular region
    * @param region name
    * @param CF name

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Sep 22 12:51:59 2012
@@ -2897,6 +2897,11 @@ public class HRegionServer implements HR
   }
 
   @Override
+  public long getStartCode() {
+    return this.serverInfo.getStartCode();
+  }
+
+  @Override
   public long getCurrentTimeMillis() {
     return EnvironmentEdgeManager.currentTimeMillis();
   }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java Sat Sep 22 12:51:59 2012
@@ -67,7 +67,7 @@ public class TestBatchedUpload {
     // start batch processing
     // do a bunch of puts
     // finish batch. Check for Exceptions.
-    int attempts = writeData(ht, NUM_ROWS);
+    int attempts = writeData(ht, NUM_ROWS, true);
     assert(attempts > 1);
 
     readData(ht, NUM_ROWS);
@@ -75,7 +75,39 @@ public class TestBatchedUpload {
     ht.close();
   }
 
-  public int writeData(HTable table, long numRows) throws IOException {
+  @Test
+  /*
+   * Test to make sure that if a region moves benignly, and both
+   * the source and dest region servers are alive, then the batch
+   * should succeed.
+   */
+  public void testBatchedUploadWithRegionMoves() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testBatchedUploadWithRegionMoves");
+    int NUM_REGIONS = 10;
+    HTable ht = TEST_UTIL.createTable(TABLE, new byte[][]{FAMILY},
+        3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+    int NUM_ROWS = 1000;
+
+    // start batch processing
+    // do a bunch of puts
+    // finish batch. Check for Exceptions.
+    int attempts = writeData(ht, NUM_ROWS, false);
+    assert(attempts == 1);
+
+    readData(ht, NUM_ROWS);
+
+    ht.close();
+  }
+
+  /**
+   * Write data to the htable. While randomly killing/shutting down regionservers.
+   * @param table
+   * @param numRows
+   * @param killRS -- true to kill the RS. false to do a clean shutdown.
+   * @return number of attempts to complete the batch.
+   * @throws IOException
+   */
+  public int writeData(HTable table, long numRows, boolean killRS) throws IOException {
     int attempts = 0;
     int MAX = 10;
     MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
@@ -109,9 +141,15 @@ public class TestBatchedUpload {
             numRS = cluster.getRegionServerThreads().size();
             int idxToKill = Math.abs(rand.nextInt()) % numRS;
             LOG.debug("Try " + attempts + " written Puts : " + i);
-            LOG.info("Randomly killing region server " + idxToKill + ". Got probability " + prob
-                + " < " + killProb);
-            cluster.abortRegionServer(idxToKill);
+            if (killRS) {
+              LOG.info("Randomly killing region server " + idxToKill
+                  + ". Got probability " + prob + " < " + killProb);
+              cluster.abortRegionServer(idxToKill);
+            } else { // clean shutdown
+              LOG.info("Randomly shutting down region server " + idxToKill
+                  + ". Got probability " + prob + " < " + killProb);
+              cluster.stopRegionServer(idxToKill);
+            }
 
             // keep decreasing the probability of killing the RS
             killProb = killProb / 2;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Sat Sep 22 12:51:59 2012
@@ -88,6 +88,8 @@ public class LoadTestTool extends Abstra
 
   private static final String OPT_KEY_WINDOW = "key_window";
   private static final String OPT_WRITE = "write";
+  private static final String OPT_BATCHED_WRITES = "batch";
+  private static final String OPT_BATCHED_WRITES_CNT = "batch_cnt";
   private static final String OPT_MAX_READ_ERRORS = "max_read_errors";
   private static final String OPT_MULTIPUT = "multiput";
   private static final String OPT_NUM_KEYS = "num_keys";
@@ -133,6 +135,10 @@ public class LoadTestTool extends Abstra
   private int verifyPercent;
   private int profilePercent = 0;
 
+  private boolean isBatched;
+
+  private int batchSize;
+
   private String[] splitColonSeparated(String option,
       int minNumCols, int maxNumCols) {
     String optVal = cmd.getOptionValue(option);
@@ -197,6 +203,8 @@ public class LoadTestTool extends Abstra
         "without port numbers");
     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
+    addOptWithArg(OPT_BATCHED_WRITES, "Use batched writes (with WAL)");
+    addOptWithArg(OPT_BATCHED_WRITES_CNT, "Size of a batch (if using batched writes)");
     addOptWithArg(OPT_READ, OPT_USAGE_READ);
     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
@@ -261,8 +269,15 @@ public class LoadTestTool extends Abstra
       }
 
       isMultiPut = cmd.hasOption(OPT_MULTIPUT);
+      isBatched = cmd.hasOption(OPT_BATCHED_WRITES);
+      if (cmd.hasOption(OPT_BATCHED_WRITES_CNT)) {
+        batchSize = parseInt(cmd.getOptionValue(OPT_BATCHED_WRITES_CNT),
+            1, Integer.MAX_VALUE);
+      }
 
       System.out.println("Multi-puts: " + isMultiPut);
+      System.out.println("isBatched: " + isBatched
+                        + (isBatched ? " batch size " + batchSize : "" ));
       System.out.println("Columns per key: " + minColsPerKey + ".."
           + maxColsPerKey);
       System.out.println("Data size per column: " + minColDataSize + ".."
@@ -345,6 +360,8 @@ public class LoadTestTool extends Abstra
       writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY, 
           profilePercent, this.txCompression, this.rxCompression);
       writerThreads.setMultiPut(isMultiPut);
+      writerThreads.setBatching(isBatched);
+      writerThreads.setBatchSize(batchSize);
       writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
       writerThreads.setDataSize(minColDataSize, maxColDataSize);
     }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Sat Sep 22 12:51:59 2012
@@ -83,6 +83,9 @@ public class MultiThreadedWriter extends
   /** RPC compression */
   private Compression.Algorithm txCompression;
   private Compression.Algorithm rxCompression;
+
+  private boolean isBatched = false;
+  private int batchSize = Integer.MAX_VALUE;
   
   public MultiThreadedWriter(Configuration conf, byte[] tableName,
       byte[] columnFamily) {
@@ -156,6 +159,10 @@ public class MultiThreadedWriter extends
     public void run() {
       try {
         long rowKey;
+        int count = 0;
+        if (isBatched) {
+          table.startBatchedLoad();
+        }
         while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
           long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
               % (maxColumnsPerKey - minColumnsPerKey);
@@ -171,6 +178,26 @@ public class MultiThreadedWriter extends
           if (trackInsertedKeys) {
             insertedKeys.add(rowKey);
           }
+
+          if (isBatched && ++count % batchSize == 0) {
+            try {
+              table.endBatchedLoad();
+              LOG.info("Count so far " + count + ". Batch ended");
+            } catch (IOException e) {
+              // log and continue.
+              LOG.info("Count so far " + count + ". Batch failed ", e);
+            }
+            table.startBatchedLoad();
+          }
+        }
+        if (isBatched && ++count % batchSize != 0) {
+          try {
+            table.endBatchedLoad();
+            LOG.info("Count so far " + count + ". Batch ended");
+          } catch (IOException e) {
+            // log and continue.
+            LOG.info("Count so far " + count + ". Batch failed ", e);
+          }
         }
       } finally {
         try {
@@ -330,4 +357,12 @@ public class MultiThreadedWriter extends
     trackInsertedKeys = enable;
   }
 
+  public void setBatching(boolean isBatched) {
+    this.isBatched = isBatched;
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
 }