You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/07/25 20:32:55 UTC

svn commit: r1365690 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hb...

Author: mbautin
Date: Wed Jul 25 18:32:54 2012
New Revision: 1365690

URL: http://svn.apache.org/viewvc?rev=1365690&view=rev
Log:
[HBASE-6451] Try to make the put path handle errors similar to getRegionServerWithRetries

Author: aaiyer

Summary:
Prakash has changed getRegionServerWithRetries to handle retries and
failures better. This change tries to bring similar improvements to the put
path.

Test Plan: Run on MR. Test out on dev cluster.

Reviewers: pkhemani, liyintang

Reviewed By: pkhemani

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D508747

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Jul 25 18:32:54 2012
@@ -35,7 +35,6 @@ public final class HConstants {
   public enum OperationStatusCode {
     NOT_RUN,
     SUCCESS,
-    SANITY_CHECK_FAILURE,
     FAILURE;
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Wed Jul 25 18:32:54 2012
@@ -322,8 +322,10 @@ public interface HConnection extends Clo
    * Delete the cached location
    * @param tableName
    * @param row
+   * @param oldLoc
    */
-  public void deleteCachedLocation(final byte [] tableName, final byte [] row);
+  public void deleteCachedLocation(final byte [] tableName, final byte [] row,
+      HServerAddress oldLoc);
   
   /**
    * Enable or disable region cache prefetch for the table. It will be

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Jul 25 18:32:54 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
@@ -882,7 +883,7 @@ public class HConnectionManager {
                 return location;
               }
             } else {
-              deleteCachedLocation(tableName, row);
+              deleteCachedLocation(tableName, row, null);
             }
 
           // Query the root or meta region for the location of the meta region
@@ -1033,10 +1034,11 @@ public class HConnectionManager {
     }
 
     /*
-     * Delete a cached location, if it satisfies the table name and row
-     * requirements.
+     * Delete a cached location for the specified table name and row
+     * if it is located on the (optionally) specified old location.
      */
-    public void deleteCachedLocation(final byte [] tableName, final byte [] row) {
+    public void deleteCachedLocation(final byte [] tableName,
+        final byte [] row, HServerAddress oldServer) {
       synchronized (this.cachedRegionLocations) {
         Map<byte [], HRegionLocation> tableLocations =
             getTableLocations(tableName);
@@ -1046,6 +1048,11 @@ public class HConnectionManager {
         if (!tableLocations.isEmpty()) {
           HRegionLocation rl = getCachedLocation(tableName, row);
           if (rl != null) {
+            // If oldLocation is specified. deleteLocation only if it is the same.
+            if (oldServer != null
+                && !oldServer.equals(rl.getServerAddress()))
+              return; // perhaps, some body else cleared and repopulated.
+
             tableLocations.remove(rl.getRegionInfo().getStartKey());
             if (LOG.isDebugEnabled()) {
               LOG.debug("Removed " +
@@ -1289,29 +1296,21 @@ public class HConnectionManager {
       callable.instantiateRegionLocation(false /* reload cache? */);
       for(int tries = 0; ; tries++) {
         try {
-          callable.instantiateServer();
-          return callable.call();
-        } catch (Throwable t) {
-          boolean isLocalException = !(t instanceof RemoteException);
-          // translateException throws DoNotRetryException or any
-          // non-IOException.
-          t = translateException(t);
-          if (isLocalException && (t instanceof SocketTimeoutException ||
-              t instanceof ConnectException ||
-              t instanceof ClosedChannelException ||
-              t instanceof SyncFailedException ||
-              t instanceof EOFException)) {
-            // XXX this list covers most connectivity exceptions but not all. 
-            // For example, in SocketOutputStream a plain IOException is thrown
-            // at times when the channel is closed.
-
-            // if thrown these exceptions, we clear all the cache entries that
-            // map to that slow/dead server; otherwise, let cache miss and ask
-            // .META. again to find the new location
-            HRegionLocation hrl = callable.location;
-            clearCachedLocationForServer(hrl.getServerAddress().toString());
+          return getRegionServerWithoutRetries(callable, false);
+        } catch (DoNotRetryIOException ioe) {
+          // clear cache if needed
+          if (ioe.getCause() instanceof NotServingRegionException) {
+            HRegionLocation prevLoc = callable.location;
+            if (prevLoc.getRegionInfo() != null) {
+              deleteCachedLocation(callable.tableName,
+                  prevLoc.getRegionInfo().getStartKey(),
+                  prevLoc.getServerAddress());
+            }
           }
 
+          // If we are not supposed to retry; Let it pass through.
+          throw ioe;
+        } catch (Throwable t) {
           exceptions.add(t);
           if (tries == numRetries - 1) {
             throw new RetriesExhaustedException(callable.getServerName(),
@@ -1319,8 +1318,14 @@ public class HConnectionManager {
           }
 
           HRegionLocation prevLoc = callable.location;
+          if (prevLoc.getRegionInfo() != null) {
+            deleteCachedLocation(callable.tableName,
+                prevLoc.getRegionInfo().getStartKey(),
+                prevLoc.getServerAddress());
+          }
           // do not retry if getting the location throws exception
-          callable.instantiateRegionLocation(true /* reload cache ? */);
+          callable.instantiateRegionLocation(false /* reload cache ? */);
+
           if (prevLoc.getServerAddress().
               equals(callable.location.getServerAddress())) {
             long pauseTime = getPauseTime(tries);
@@ -1339,11 +1344,11 @@ public class HConnectionManager {
               throw new InterruptedIOException();
             }
             // do not reload cache. While we were sleeping hopefully the cache
-            // has been re-populated. We had anyway invalidated it earlier
-            // before going to sleep.
+            // has been re-populated.
             callable.instantiateRegionLocation(false);
           } else {
             LOG.debug("getRegionServerWithRetries failed, " +
+                "region moved from " + prevLoc + " to " + callable.location +
                 "retrying immediately tries=" + tries, t);
           }
         }
@@ -1353,12 +1358,45 @@ public class HConnectionManager {
     @Override
     public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
         throws IOException, RuntimeException {
+        return getRegionServerWithoutRetries(callable, true);
+    }
+
+    /**
+     * Pass in a ServerCallable with your particular bit of logic defined and
+     * this method will pass it to the defined region server.
+     * @param <T> the type of the return value
+     * @param callable callable to run
+     * @return an object of type T
+     * @throws IOException if a remote or network exception occurs
+     * @throws RuntimeException other unspecified error
+     */
+    private <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
+        boolean instantiateRegionLocation)
+        throws IOException, RuntimeException {
       try {
-        callable.instantiateRegionLocation(false);
+        if (instantiateRegionLocation) callable.instantiateRegionLocation(false);
         callable.instantiateServer();
         return callable.call();
-      } catch (Throwable t) {
-        Throwable t2 = translateException(t);
+      } catch (Throwable t1) {
+        Throwable t2 = translateException(t1);
+        boolean isLocalException = !(t2 instanceof RemoteException);
+        // translateException throws DoNotRetryException or any
+        // non-IOException.
+        if (isLocalException && (t2 instanceof SocketTimeoutException ||
+            t2 instanceof ConnectException ||
+            t2 instanceof ClosedChannelException ||
+            t2 instanceof SyncFailedException ||
+            t2 instanceof EOFException)) {
+          // XXX this list covers most connectivity exceptions but not all.
+          // For example, in SocketOutputStream a plain IOException is thrown
+          // at times when the channel is closed.
+
+          // if thrown these exceptions, we clear all the cache entries that
+          // map to that slow/dead server; otherwise, let cache miss and ask
+          // .META. again to find the new location
+          clearCachedLocationForServer(callable.location.getServerAddress().toString());
+        }
+
         if (t2 instanceof IOException) {
           throw (IOException)t2;
         } else {
@@ -1381,7 +1419,10 @@ public class HConnectionManager {
 
                @Override
                public void instantiateRegionLocation(boolean reload) throws IOException {
-                 // do nothing.
+                  // we don't need to locate the region, since we have been given the address of the
+                  // server. But, let us store the information in this.location, so that
+                  // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
+                  this.location = new HRegionLocation(null, address);
                }
 
                @Override
@@ -1503,19 +1544,20 @@ public class HConnectionManager {
        } catch (InterruptedException ie) {
          throw ie;
        } catch (ExecutionException e) {
-         translateException(e.getCause());
+         if(e.getCause() instanceof DoNotRetryIOException)
+           throw (DoNotRetryIOException)e.getCause();
        }
 
        // If we got a response. Let us go through the responses from each region and
        // process the Puts and Deletes.
        // If the response is null, we will add it to newWorkingList here.
        if (request.deletes != null) {
-         newWorkingList = processMutationResponseFromOneRegionServer(tableName, resp,
-             request.deletes, newWorkingList, true);
+         newWorkingList = processMutationResponseFromOneRegionServer(tableName, address,
+             resp, request.deletes, newWorkingList, true);
        }
        if (request.puts != null) {
-         newWorkingList = processMutationResponseFromOneRegionServer(tableName, resp,
-             request.puts, newWorkingList, false);
+         newWorkingList = processMutationResponseFromOneRegionServer(tableName, address,
+             resp, request.puts, newWorkingList, false);
        }
      }
       return newWorkingList;
@@ -1546,7 +1588,8 @@ public class HConnectionManager {
          } catch (InterruptedException ie) {
            throw ie;
          } catch (ExecutionException e) {
-           translateException(e.getCause());
+           if(e.getCause() instanceof DoNotRetryIOException)
+             throw (DoNotRetryIOException)e.getCause();
          }
 
          if (resp == null) {
@@ -1554,13 +1597,15 @@ public class HConnectionManager {
            LOG.debug("Failed all for server: " + address + ", removing from cache");
          }
 
-         newWorkingList = processGetResponseFromOneRegionServer(tableName, resp, request, orig_list,
-            newWorkingList, results);
+         newWorkingList = processGetResponseFromOneRegionServer(tableName, address,
+             request, resp, orig_list, newWorkingList, results);
        }
        return newWorkingList;
      }
 
-     private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(byte[] tableName,
+     private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
+        byte[] tableName,
+        HServerAddress address,
         MultiResponse resp,
         Map<byte[], List<R>> map,
         List<Mutation> newWorkingList,
@@ -1593,7 +1638,7 @@ public class HConnectionManager {
 
             newWorkingList.addAll(regionOps);
             // enough to remove from cache one of the rows from the region
-            deleteCachedLocation(tableName, regionOps.get(0).getRow());
+            deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
           }
         }
 
@@ -1601,7 +1646,9 @@ public class HConnectionManager {
     }
 
     private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
-       MultiResponse resp, MultiAction request,
+       HServerAddress address,
+       MultiAction request,
+       MultiResponse resp,
        List<Get> orig_list, List<Get> newWorkingList,
        Result[] results) throws IOException {
 
@@ -1631,7 +1678,7 @@ public class HConnectionManager {
            }
 
            // enough to clear this once for a region
-           deleteCachedLocation(tableName, regionGets.get(0).getRow());
+           deleteCachedLocation(tableName, regionGets.get(0).getRow(), address);
          }
        }
 
@@ -1963,57 +2010,7 @@ public class HConnectionManager {
     }
 
     /**
-     * Process a single put request
-     * @param put The put request
-     * @param failed The failed request list
-     * @param tableName The table name for the put request
-     * @return The cause of exceptions when processing the put request.
-     * @throws IOException
-     */
-    private Throwable processSinglePut(Put put,
-        List<Put> failed, final byte[] tableName, HBaseRPCOptions options) throws IOException {
-      // XXX error handling should mirror getRegionServerWithRetries()
-      // Get server address
-      byte [] row = put.getRow();
-      HRegionLocation loc = locateRegion(tableName, row, true);
-      HServerAddress address = loc.getServerAddress();
-      byte [] regionName = loc.getRegionInfo().getRegionName();
-
-      // Convert the single put to the multiput
-      MultiPut multiPut = new MultiPut(address);
-      multiPut.add(regionName, put);
-
-      // Create the multiPutCallable
-      Callable<MultiPutResponse> multiPutCallable =
-        createPutCallable(multiPut.address, multiPut, tableName, options);
-
-      try {
-        // Get the MultiPutResponse
-        MultiPutResponse response = multiPutCallable.call();
-
-        // verify the result
-        Integer result = response.getAnswer(regionName);
-        if (result == null || result >= 0) {
-          LOG.debug("Failed all for region: " +
-              Bytes.toStringBinary(regionName));
-          failed.add(put);
-        }
-      } catch (Exception e) {
-        // process exceptions
-        LOG.debug("Failed all from " + address, e);
-        failed.add(put);
-
-        if (e.getCause() instanceof DoNotRetryIOException) {
-          // Just give up, leaving the batch put list in an
-          // untouched/semi-committed state
-          throw (DoNotRetryIOException) e.getCause();
-        }
-        return e.getCause();
-      }
-      return null;
-    }
-
-    /**
+     * Try to make the put path handle errors similar to getRegionServerWithRetries
      * Process a list of puts from a shared HTable thread pool.
      * @param list The input put request list
      * @param failed The failed put request list
@@ -2050,9 +2047,19 @@ public class HConnectionManager {
 
       List<Future<MultiPutResponse>> futures =
           new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
+      boolean singleServer = (multiPuts.size() == 1);
       for ( MultiPut put : multiPuts ) {
-        futures.add(HTable.multiActionThreadPool.submit(
-            createPutCallable(put.address, put, tableName, options)));
+        Callable<MultiPutResponse> callable = createPutCallable(put.address,
+            put, tableName, options);
+        Future<MultiPutResponse> task;
+        if (singleServer) {
+          task = new FutureTask<MultiPutResponse>(callable);
+          ((FutureTask<MultiPutResponse>)task).run();
+        }
+        else {
+            task = HTable.multiActionThreadPool.submit(callable);
+        }
+        futures.add(task);
       }
 
       // step 3:
@@ -2060,37 +2067,38 @@ public class HConnectionManager {
       for (int i = 0; i < futures.size(); i++ ) {
         Future<MultiPutResponse> future = futures.get(i);
         MultiPut request = multiPuts.get(i);
+        MultiPutResponse resp = null;
         try {
-          MultiPutResponse resp = future.get();
-
-          // For each region
-          for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
-            Integer result = resp.getAnswer(e.getKey());
-            if (result == null) {
-              // failed
-              LOG.debug("Failed all for region: " +
-                  Bytes.toStringBinary(e.getKey()) + ", removing from cache");
-              failed.addAll(e.getValue());
-            } else if (result >= 0) {
-              // some failures
-              List<Put> lst = e.getValue();
-              failed.addAll(lst.subList(result, lst.size()));
-              LOG.debug("Failed past " + result + " for region: " +
-                  Bytes.toStringBinary(e.getKey()) + ", removing from cache");
-            }
-          }
+          resp = future.get();
         } catch (InterruptedException e) {
-          // go into the failed list.
-          LOG.debug("Failed all from " + request.address, e);
-          failed.addAll(request.allPuts());
-        } catch (ExecutionException e) {
-          // all go into the failed list.
-          LOG.debug("Failed all from " + request.address, e);
-          failed.addAll(request.allPuts());
-
-          // Just give up, leaving the batch put list in an untouched/semi-committed state
-          if (e.getCause() instanceof DoNotRetryIOException) {
-            throw (DoNotRetryIOException) e.getCause();
+          Thread.currentThread().interrupt();
+          throw new InterruptedIOException(e.getMessage());
+        } catch (ExecutionException ex) {
+          // retry, unless it is not to be retried.
+          if (ex.getCause() instanceof DoNotRetryIOException)
+            throw (DoNotRetryIOException)ex.getCause();
+        }
+
+        // For each region
+        for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
+          byte[] region = e.getKey();
+          List<Put> lst = e.getValue();
+          Integer result = null;
+          if (resp != null)
+            result = resp.getAnswer(region);
+
+          if (result == null) {
+            // failed
+            LOG.debug("Failed all for region: " +
+                Bytes.toStringBinary(region) + ", removing from cache");
+            deleteCachedLocation(tableName, lst.get(0).getRow(), request.address);
+
+            failed.addAll(e.getValue());
+          } else if (result != HConstants.MULTIPUT_SUCCESS) {
+            // some failures
+            failed.addAll(lst.subList(result, lst.size()));
+            LOG.debug("Failed past " + result + " for region: " +
+                Bytes.toStringBinary(region) + ", removing from cache");
           }
         }
       }
@@ -2117,12 +2125,14 @@ public class HConnectionManager {
             }
             failedPuts.addAll(e.getValue());
 
-          } else if (result >= 0) {
+          } else if (result != HConstants.MULTIPUT_SUCCESS) {
             // Prepared the failed puts for retry
             if (failedPuts == null) {
               failedPuts = new ArrayList<Put>();
             }
             List<Put> lst = e.getValue();
+            // In case of a failure the return result is the index (non-inclusive) up
+            // to which the operations succeded.
             failedPuts.addAll(lst.subList(result, lst.size()));
           } 
         }
@@ -2154,47 +2164,44 @@ public class HConnectionManager {
      */
     public void processBatchOfPuts(List<Put> list, final byte[] tableName, HBaseRPCOptions options)
     throws IOException {
-      boolean singletonList = list.size() == 1;
-      Throwable singleRowCause = null;
-      for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
+      long callStartTime;
+      callStartTime = System.currentTimeMillis();
+
+      int tries;
+      for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
         List<Put> failed = new ArrayList<Put>();
-        if (singletonList) {
-          singleRowCause = this.processSinglePut(list.get(0), failed, tableName, options);
-        } else {
-          this.processBatchOfMultiPut(list, failed, tableName, options);
-        }
+        this.processBatchOfMultiPut(list, failed, tableName, options);
 
         list.clear();
         if (!failed.isEmpty()) {
-          // clear the cache for failed puts
-          for (Put failedPut: failed) {
-            deleteCachedLocation(tableName, failedPut.getRow());
-          }
-
           // retry the failed ones after sleep
           list.addAll(failed);
-          long sleepTime = getPauseTime(tries);
-          LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
+
+          // Do not sleep the first time. The region might have moved.
+          if (tries <= 1) continue;
+
+          long pauseTime = getPauseTime(tries);
+          if ((System.currentTimeMillis() - callStartTime + pauseTime) >
+              rpcRetryTimeout) {
+            break; // we will throw a RetriesExhaustedException
+          }
+          LOG.debug("processBatchOfPuts had some failures, sleeping for " + pauseTime +
               " ms!");
           try {
-            Thread.sleep(sleepTime);
+            Thread.sleep(pauseTime);
           } catch (InterruptedException ignored) {
-            // ignore here
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
           }
         }
       }
 
       // Get exhausted after the retries
       if (!list.isEmpty()) {
-        if (singletonList && singleRowCause != null) {
-          throw new IOException("Exhaused retrying " + numRetries +
-              " and the last exception caught is " + singleRowCause);
-        }
-
         // ran out of retries and didnt succeed everything!
         throw new RetriesExhaustedException("Still had " + list.size() +
             " puts left after retrying " +
-            numRetries + " times.");
+            tries + " times, in " + (System.currentTimeMillis() - callStartTime) + " ms.");
       }
     }
 
@@ -2214,7 +2221,12 @@ public class HConnectionManager {
                 @Override
                 public void instantiateRegionLocation(boolean reload)
                     throws IOException {
-                  // location is already cached
+                  // we don't need to locate the region, since we have been given the address of the
+                  // server. But, let us store the information in this.location, so that
+                  // we can handle failures (i.e. clear cache) if we fail to connect to the RS.
+                  // (see HConnectionManager.getRegionServerWithRetries for how it is used to
+                  // call deleteCachedLocation.)
+                  this.location = new HRegionLocation(null, address);
                 }
                 @Override
                 public void instantiateServer() throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Wed Jul 25 18:32:54 2012
@@ -341,12 +341,12 @@ public class HTableMultiplexer {
       return queue.size() + currentProcessingPutCount.get();
     }
 
-    private boolean resubmitFailedPut(PutStatus failedPutStatus) throws IOException{
+    private boolean resubmitFailedPut(PutStatus failedPutStatus, HServerAddress oldLoc) throws IOException{
       Put failedPut = failedPutStatus.getPut();
       // The currentPut is failed. So get the table name for the currentPut.
       byte[] tableName = failedPutStatus.getRegionInfo().getTableDesc().getName();
       // Clear the cached location for the failed puts
-      this.connection.deleteCachedLocation(tableName, failedPut.getRow());
+      this.connection.deleteCachedLocation(tableName, failedPut.getRow(), oldLoc);
       // Decrease the retry count
       int retryCount = failedPutStatus.getRetryCount() - 1;
       
@@ -409,14 +409,15 @@ public class HTableMultiplexer {
               if (failed.size() == processingList.size()) {
                 // All the puts for this region server are failed. Going to retry it later
                 for (PutStatus putStatus: processingList) {
-                  if (!resubmitFailedPut(putStatus)) {
+                  if (!resubmitFailedPut(putStatus, this.addr)) {
                     failedCount++;
                   }
                 }
               } else {
                 Set<Put> failedPutSet = new HashSet<Put>(failed);
                 for (PutStatus putStatus: processingList) {
-                  if (failedPutSet.contains(putStatus.getPut()) && !resubmitFailedPut(putStatus)) {
+                  if (failedPutSet.contains(putStatus.getPut())
+                      && !resubmitFailedPut(putStatus, this.addr)) {
                     failedCount++;
                   }
                 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Jul 25 18:32:54 2012
@@ -37,7 +37,6 @@ import java.io.IOException;
  */
 public class ScannerCallable extends ServerCallable<Result[]> {
   private long scannerId = -1L;
-  private boolean instantiated = false;
   private boolean closed = false;
   private Scan scan;
   private int caching = 1;
@@ -53,25 +52,6 @@ public class ScannerCallable extends Ser
     this.scan = scan;
   }
 
-  @Override
-  public void instantiateRegionLocation(boolean reload) throws IOException {
-    if (!instantiated || reload) {
-      super.instantiateRegionLocation(reload);
-      instantiated = false;
-    }
-  }
-  /**
-   * @param reload force reload of server location
-   * @throws IOException
-   */
-  @Override
-  public void instantiateServer() throws IOException {
-    if (!instantiated) {
-      super.instantiateServer();
-      instantiated = true;
-    }
-  }
-
   /**
    * @see java.util.concurrent.Callable#call()
    */
@@ -137,7 +117,7 @@ public class ScannerCallable extends Ser
    * @return the HRegionInfo for the current region
    */
   public HRegionInfo getHRegionInfo() {
-    if (!instantiated) {
+    if (location == null) {
       return null;
     }
     return location.getRegionInfo();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jul 25 18:32:54 2012
@@ -2016,15 +2016,8 @@ public class HRegion implements HeapSize
 
         // Check the families in the put. If bad, skip this one.
         if (op instanceof Put) {
-          try {
-            checkFamilies(op.getFamilyMap().keySet());
-            checkTimestamps(op, now);
-          } catch (DoNotRetryIOException dnrioe) {
-            LOG.warn("Sanity check error in batch processing", dnrioe);
-            batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.SANITY_CHECK_FAILURE;
-            lastIndexExclusive++;
-            continue;
-          }
+          checkFamilies(op.getFamilyMap().keySet());
+          checkTimestamps(op, now);
         }
 
         // If we haven't got any rows in our batch, we should block to

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Jul 25 18:32:54 2012
@@ -58,7 +58,7 @@ public class TestHCM {
     HConnectionManager.TableServers conn =
         (HConnectionManager.TableServers) table.getConnection();
     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
-    conn.deleteCachedLocation(TABLE_NAME, ROW);
+    conn.deleteCachedLocation(TABLE_NAME, ROW, null);
     HRegionLocation rl = conn.getCachedLocation(TABLE_NAME, ROW);
     assertNull("What is this location?? " + rl, rl);
   }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1365690&r1=1365689&r2=1365690&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Jul 25 18:32:54 2012
@@ -376,14 +376,18 @@ public class TestHRegion extends HBaseTe
 
     LOG.info("Next a batch put with one invalid family");
     puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
-    codes = this.region.put(puts);
-    assertEquals(10, codes.length);
-    for (int i = 0; i < 10; i++) {
-      assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
-        OperationStatusCode.SUCCESS, codes[i]);
+    try {
+      codes = this.region.put(puts);
+
+      // put should throw an exception
+      assertTrue(false);
+    } catch(IOException e) {
+      // continue
     }
-    assertEquals(1, HLog.getSyncTime().count);
 
+    // let make the puts good again.
+    puts[5] = new Put(Bytes.toBytes("row_" + 5));
+    puts[5].add(cf, qual, val);
     LOG.info("Next a batch put that has to break into two batches to avoid a lock");
     Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2"));
 
@@ -417,8 +421,7 @@ public class TestHRegion extends HBaseTe
     assertEquals(1, HLog.getSyncTime().count);
     codes = retFromThread.get();
     for (int i = 0; i < 10; i++) {
-      assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
-        OperationStatusCode.SUCCESS, codes[i]);
+      assertEquals(OperationStatusCode.SUCCESS, codes[i]);
     }
 
     LOG.info("Nexta, a batch put which uses an already-held lock");
@@ -434,8 +437,7 @@ public class TestHRegion extends HBaseTe
     codes = region.batchMutateWithLocks(putsAndLocks.toArray(new Pair[0]), "multiput_");
     LOG.info("...performed put");
     for (int i = 0; i < 10; i++) {
-      assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
-        OperationStatusCode.SUCCESS, codes[i]);
+      assertEquals(OperationStatusCode.SUCCESS, codes[i]);
     }
     // Make sure we didn't do an extra batch
     assertEquals(1, HLog.getSyncTime().count);
@@ -2775,24 +2777,24 @@ public class TestHRegion extends HBaseTe
     int version = 0;
 
     for (int f =0 ; f < num_storefiles; f++) {
-	for (int i = 0; i < duplicate_multiplier; i ++) {
-		for (int j = 0; j < num_unique_rows; j++) {
-			Put put = new Put(Bytes.toBytes("row" + j));
-			put.add(fam1, qf1, version++, val1);
+      for (int i = 0; i < duplicate_multiplier; i ++) {
+        for (int j = 0; j < num_unique_rows; j++) {
+          Put put = new Put(Bytes.toBytes("row" + j));
+          put.add(fam1, qf1, version++, val1);
           region.put(put);
-		}
-	    }
-	    region.flushcache();
+        }
+      }
+      region.flushcache();
     }
     //before compaction
     Store store = region.getStore(fam1);
     List<StoreFile> storeFiles = store.getStorefiles();
     for (StoreFile storefile : storeFiles) {
-	StoreFile.Reader reader = storefile.getReader();
-	reader.loadFileInfo();
+      StoreFile.Reader reader = storefile.getReader();
+      reader.loadFileInfo();
       reader.loadBloomfilter();
       assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
-	assertEquals(num_unique_rows, reader.getFilterEntries());
+      assertEquals(num_unique_rows, reader.getFilterEntries());
     }
 
     region.compactStores(true);
@@ -2800,10 +2802,10 @@ public class TestHRegion extends HBaseTe
     //after compaction
     storeFiles = store.getStorefiles();
     for (StoreFile storefile : storeFiles) {
-	StoreFile.Reader reader = storefile.getReader();
-	reader.loadFileInfo();
+      StoreFile.Reader reader = storefile.getReader();
+      reader.loadFileInfo();
       reader.loadBloomfilter();
-	assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles,
+      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles,
           reader.getEntries());
       assertEquals(num_unique_rows, reader.getFilterEntries());
     }